Merge branch 'master' into 4823-python-sdk-writable-collection-api
[arvados.git] / sdk / python / arvados / collection.py
index dd4946400161be7c87397d5ca9e2543aedf7fe32..20f5c40bfccdf8eb945db95bf575e6f2396782f7 100644 (file)
@@ -8,14 +8,16 @@ import time
 from collections import deque
 from stat import *
 
-from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, _synchronized, _must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
+from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
 from keep import *
 from .stream import StreamReader, normalize_stream, locator_block_size
 from .ranges import Range, LocatorAndRange
+from .safeapi import SafeApi
 import config
 import errors
 import util
 import events
+from arvados.retry import retry_method
 
 _logger = logging.getLogger('arvados.collection')
 
@@ -643,10 +645,15 @@ class ResumableCollectionWriter(CollectionWriter):
 
 ADD = "add"
 DEL = "del"
+MOD = "mod"
 
 class SynchronizedCollectionBase(CollectionBase):
+    """Base class for Collections and Subcollections.  Implements the majority of
+    functionality relating to accessing items in the Collection."""
+
     def __init__(self, parent=None):
         self.parent = parent
+        self._modified = True
         self._items = {}
 
     def _my_api(self):
@@ -658,19 +665,19 @@ class SynchronizedCollectionBase(CollectionBase):
     def _my_block_manager(self):
         raise NotImplementedError()
 
-    def _root_lock(self):
-        raise NotImplementedError()
-
     def _populate(self):
         raise NotImplementedError()
 
     def sync_mode(self):
         raise NotImplementedError()
 
-    def notify(self, collection, event, name, item):
+    def root_collection(self):
+        raise NotImplementedError()
+
+    def notify(self, event, collection, name, item):
         raise NotImplementedError()
 
-    @_synchronized
+    @synchronized
     def find(self, path, create=False, create_collection=False):
         """Recursively search the specified file path.  May return either a Collection
         or ArvadosFile.
@@ -695,7 +702,7 @@ class SynchronizedCollectionBase(CollectionBase):
         if p[0] == '.':
             del p[0]
 
-        if len(p) > 0:
+        if p and p[0]:
             item = self._items.get(p[0])
             if len(p) == 1:
                 # item must be a file
@@ -706,16 +713,21 @@ class SynchronizedCollectionBase(CollectionBase):
                     else:
                         item = ArvadosFile(self)
                     self._items[p[0]] = item
-                    self.notify(self, ADD, p[0], item)
+                    self._modified = True
+                    self.notify(ADD, self, p[0], item)
                 return item
             else:
                 if item is None and create:
                     # create new collection
                     item = Subcollection(self)
                     self._items[p[0]] = item
-                    self.notify(self, ADD, p[0], item)
+                    self._modified = True
+                    self.notify(ADD, self, p[0], item)
                 del p[0]
-                return item.find("/".join(p), create=create)
+                if isinstance(item, SynchronizedCollectionBase):
+                    return item.find("/".join(p), create=create)
+                else:
+                    raise errors.ArgumentError("Interior path components must be subcollection")
         else:
             return self
 
@@ -760,67 +772,71 @@ class SynchronizedCollectionBase(CollectionBase):
         else:
             return ArvadosFileWriter(f, path, mode, num_retries=self.num_retries)
 
-    @_synchronized
+    @synchronized
     def modified(self):
         """Test if the collection (or any subcollection or file) has been modified
         since it was created."""
+        if self._modified:
+            return True
         for k,v in self._items.items():
             if v.modified():
                 return True
         return False
 
-    @_synchronized
+    @synchronized
     def set_unmodified(self):
         """Recursively clear modified flag"""
+        self._modified = False
         for k,v in self._items.items():
             v.set_unmodified()
 
-    @_synchronized
+    @synchronized
     def __iter__(self):
         """Iterate over names of files and collections contained in this collection."""
-        return self._items.keys()
+        return self._items.keys().__iter__()
 
-    @_synchronized
+    @synchronized
     def iterkeys(self):
         """Iterate over names of files and collections directly contained in this collection."""
         return self._items.keys()
 
-    @_synchronized
+    @synchronized
     def __getitem__(self, k):
         """Get a file or collection that is directly contained by this collection.  If
         you want to search a path, use `find()` instead.
         """
         return self._items[k]
 
-    @_synchronized
+    @synchronized
     def __contains__(self, k):
         """If there is a file or collection a directly contained by this collection
         with name "k"."""
         return k in self._items
 
-    @_synchronized
+    @synchronized
     def __len__(self):
         """Get the number of items directly contained in this collection"""
         return len(self._items)
 
-    @_must_be_writable
-    @_synchronized
+    @must_be_writable
+    @synchronized
     def __delitem__(self, p):
         """Delete an item by name which is directly contained by this collection."""
         del self._items[p]
-        self.notify(self, DEL, p, None)
+        self._modified = True
+        self.notify(DEL, self, p, None)
 
-    @_synchronized
+    @synchronized
     def keys(self):
         """Get a list of names of files and collections directly contained in this collection."""
         return self._items.keys()
 
-    @_synchronized
+    @synchronized
     def values(self):
         """Get a list of files and collection objects directly contained in this collection."""
         return self._items.values()
 
-    @_synchronized
+    @synchronized
     def items(self):
         """Get a list of (name, object) tuples directly contained in this collection."""
         return self._items.items()
@@ -829,8 +845,8 @@ class SynchronizedCollectionBase(CollectionBase):
         """Test if there is a file or collection at "path" """
         return self.find(path) != None
 
-    @_must_be_writable
-    @_synchronized
+    @must_be_writable
+    @synchronized
     def remove(self, path, rm_r=False):
         """Remove the file or subcollection (directory) at `path`.
         :rm_r:
@@ -848,8 +864,10 @@ class SynchronizedCollectionBase(CollectionBase):
             if len(p) == 1:
                 if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not rm_r:
                     raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
+                d = self._items[p[0]]
                 del self._items[p[0]]
-                self.notify(self, DEL, p[0], None)
+                self._modified = True
+                self.notify(DEL, self, p[0], d)
             else:
                 del p[0]
                 item.remove("/".join(p))
@@ -857,47 +875,79 @@ class SynchronizedCollectionBase(CollectionBase):
             raise IOError((errno.ENOENT, "File not found"))
 
     def _cloneinto(self, target):
-        for k,v in self._items:
-            target._items[k] = v.clone(new_parent=target)
+        for k,v in self._items.items():
+            target._items[k] = v.clone(target)
 
     def clone(self):
         raise NotImplementedError()
 
-    @_must_be_writable
-    @_synchronized
-    def copyto(self, target_path, source_path, source_collection=None, overwrite=False):
-        """
-        copyto('/foo', '/bar') will overwrite 'foo' if it exists.
-        copyto('/foo/', '/bar') will place 'bar' in subcollection 'foo'
+    @must_be_writable
+    @synchronized
+    def copy(self, source, target_path, source_collection=None, overwrite=False):
+        """Copy a file or subcollection to a new path in this collection.
+
+        :source:
+          An ArvadosFile, Subcollection, or string with a path to source file or subcollection
+
+        :target_path:
+          Destination file or path.  If the target path already exists and is a
+          subcollection, the item will be placed inside the subcollection.  If
+          the target path already exists and is a file, this will raise an error
+          unless you specify `overwrite=True`.
+
+        :source_collection:
+          Collection to copy `source_path` from (default `self`)
+
+        :overwrite:
+          Whether to overwrite target file if it already exists.
         """
         if source_collection is None:
             source_collection = self
 
         # Find the object to copy
-        sp = source_path.split("/")
-        source_obj = source_collection.find(source_path)
-        if source_obj is None:
-            raise IOError((errno.ENOENT, "File not found"))
+        if isinstance(source, basestring):
+            source_obj = source_collection.find(source)
+            if source_obj is None:
+                raise IOError((errno.ENOENT, "File not found"))
+            sp = source.split("/")
+        else:
+            source_obj = source
+            sp = None
 
         # Find parent collection the target path
         tp = target_path.split("/")
-        target_dir = self.find(tp[0:-1].join("/"), create=True, create_collection=True)
 
         # Determine the name to use.
-        target_name = tp[-1] if tp[-1] else sp[-1]
+        target_name = tp[-1] if tp[-1] else (sp[-1] if sp else None)
+
+        if not target_name:
+            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
 
-        if target_name in target_dir and not overwrite:
-            raise IOError((errno.EEXIST, "File already exists"))
+        target_dir = self.find("/".join(tp[0:-1]), create=True, create_collection=True)
 
-        # Actually make the copy.
-        dup = source_obj.clone(target_dir)
         with target_dir.lock:
+            if target_name in target_dir:
+                if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sp:
+                    target_dir = target_dir[target_name]
+                    target_name = sp[-1]
+                elif not overwrite:
+                    raise IOError((errno.EEXIST, "File already exists"))
+
+            mod = None
+            if target_name in target_dir:
+                mod = target_dir[target_name]
+
+            # Actually make the copy.
+            dup = source_obj.clone(target_dir)
             target_dir._items[target_name] = dup
+            target_dir._modified = True
 
-        self.notify(target_dir, ADD, target_name, dup)
-
+        if mod:
+            self.notify(MOD, target_dir, target_name, (mod, dup))
+        else:
+            self.notify(ADD, target_dir, target_name, dup)
 
-    @_synchronized
+    @synchronized
     def manifest_text(self, strip=False, normalize=False):
         """Get the manifest text for this collection, sub collections and files.
 
@@ -920,55 +970,151 @@ class SynchronizedCollectionBase(CollectionBase):
             else:
                 return self._manifest_text
 
-    @_must_be_writable
-    @_synchronized
-    def merge(self, other):
-        for k in other.keys():
+    @synchronized
+    def diff(self, end_collection, prefix=".", holding_collection=None):
+        """
+        Generate list of add/modify/delete actions which, when given to `apply`, will
+        change `self` to match `end_collection`
+        """
+        changes = []
+        if holding_collection is None:
+            holding_collection = CollectionRoot(api_client=self._my_api(), keep_client=self._my_keep(), sync=SYNC_READONLY)
+        for k in self:
+            if k not in end_collection:
+               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
+        for k in end_collection:
             if k in self:
-                if isinstance(self[k], Subcollection) and isinstance(other[k], Subcollection):
-                    self[k].merge(other[k])
-                else:
-                    if self[k] != other[k]:
-                        name = "%s~conflict-%s~" % (k, time.strftime("%Y-%m-%d~%H:%M%:%S",
-                                                                     time.gmtime()))
-                        self[name] = other[k].clone(self)
-                        self.notify(self, name, ADD, self[name])
+                if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
+                    changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
+                elif end_collection[k] != self[k]:
+                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
             else:
-                self[k] = other[k].clone(self)
-                self.notify(self, k, ADD, self[k])
+                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
+        return changes
+
+    @must_be_writable
+    @synchronized
+    def apply(self, changes):
+        """
+        Apply changes from `diff`.  If a change conflicts with a local change, it
+        will be saved to an alternate path indicating the conflict.
+        """
+        for c in changes:
+            path = c[1]
+            initial = c[2]
+            local = self.find(path)
+            conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
+                                                                    time.gmtime()))
+            if c[0] == ADD:
+                if local is None:
+                    # No local file at path, safe to copy over new file
+                    self.copy(initial, path)
+                elif local is not None and local != initial:
+                    # There is already local file and it is different:
+                    # save change to conflict file.
+                    self.copy(initial, conflictpath)
+            elif c[0] == MOD:
+                if local == initial:
+                    # Local matches the "initial" item so it has not
+                    # changed locally and is safe to update.
+                    if isinstance(local, ArvadosFile) and isinstance(c[3], ArvadosFile):
+                        # Replace contents of local file with new contents
+                        local.replace_contents(c[3])
+                    else:
+                        # Overwrite path with new item; this can happen if
+                        # path was a file and is now a collection or vice versa
+                        self.copy(c[3], path, overwrite=True)
+                else:
+                    # Local is missing (presumably deleted) or local doesn't
+                    # match the "start" value, so save change to conflict file
+                    self.copy(c[3], conflictpath)
+            elif c[0] == DEL:
+                if local == initial:
+                    # Local item matches "initial" value, so it is safe to remove.
+                    self.remove(path, rm_r=True)
+                # else, the file is modified or already removed, in either
+                # case we don't want to try to remove it.
 
     def portable_data_hash(self):
         """Get the portable data hash for this collection's manifest."""
         stripped = self.manifest_text(strip=True)
         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
 
+    @synchronized
+    def __eq__(self, other):
+        if other is self:
+            return True
+        if not isinstance(other, SynchronizedCollectionBase):
+            return False
+        if len(self._items) != len(other):
+            return False
+        for k in self._items:
+            if k not in other:
+                return False
+            if self._items[k] != other[k]:
+                return False
+        return True
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+class CollectionRoot(SynchronizedCollectionBase):
+    """Represents the root of an Arvados Collection, which may be associated with
+    an API server Collection record.
+
+    Brief summary of useful methods:
+
+    :To read an existing file:
+      `c.open("myfile", "r")`
+
+    :To write a new file:
+      `c.open("myfile", "w")`
+
+    :To determine if a file exists:
+      `c.find("myfile") is not None`
+
+    :To copy a file:
+      `c.copy("source", "dest")`
+
+    :To delete a file:
+      `c.remove("myfile")`
+
+    :To save to an existing collection record:
+      `c.save()`
+
+    :To save a new collection record:
+    `c.save_new()`
+
+    :To merge remote changes into this object:
+      `c.update()`
+
+    This class is threadsafe.  The root collection object, all subcollections
+    and files are protected by a single lock (i.e. each access locks the entire
+    collection).
 
-class Collection(SynchronizedCollectionBase):
-    """Store an Arvados collection, consisting of a set of files and
-    sub-collections.
     """
 
     def __init__(self, manifest_locator_or_text=None,
                  parent=None,
-                 config=None,
+                 apiconfig=None,
                  api_client=None,
                  keep_client=None,
                  num_retries=None,
                  block_manager=None,
-                 sync=SYNC_READONLY):
+                 sync=None):
         """:manifest_locator_or_text:
           One of Arvados collection UUID, block locator of
           a manifest, raw manifest text, or None (to create an empty collection).
         :parent:
           the parent Collection, may be None.
-        :config:
-          the arvados configuration to get the hostname and api token.
+        :apiconfig:
+          A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
           Prefer this over supplying your own api_client and keep_client (except in testing).
           Will use default config settings if not specified.
         :api_client:
-          The API client object to use for requests.  If not specified, create one using `config`.
+          The API client object to use for requests.  If not specified, create one using `apiconfig`.
         :keep_client:
-          the Keep client to use for requests.  If not specified, create one using `config`.
+          the Keep client to use for requests.  If not specified, create one using `apiconfig`.
         :num_retries:
           the number of retries for API and Keep requests.
         :block_manager:
@@ -979,21 +1125,30 @@ class Collection(SynchronizedCollectionBase):
             Collection is read only.  No synchronization.  This mode will
             also forego locking, which gives better performance.
           :SYNC_EXPLICIT:
-            Synchronize on explicit request via `update()` or `save()`
+            Collection is writable.  Synchronize on explicit request via `update()` or `save()`
           :SYNC_LIVE:
-            Synchronize with server in response to background websocket events,
-            on block write, or on file close.
+            Collection is writable.  Synchronize with server in response to
+            background websocket events, on block write, or on file close.
 
         """
-        super(Collection, self).__init__(parent)
+        super(CollectionRoot, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
-        self._config = config
+
+        if apiconfig:
+            self._config = apiconfig
+        else:
+            self._config = config.settings()
+
         self.num_retries = num_retries
         self._manifest_locator = None
         self._manifest_text = None
         self._api_response = None
+
+        if sync is None:
+            raise errors.ArgumentError("Must specify sync mode")
+
         self._sync = sync
         self.lock = threading.RLock()
         self.callbacks = []
@@ -1013,39 +1168,51 @@ class Collection(SynchronizedCollectionBase):
             self._populate()
 
             if self._sync == SYNC_LIVE:
-                if not self._manifest_locator or not re.match(util.collection_uuid_pattern, self._manifest_locator):
-                    raise errors.ArgumentError("Cannot SYNC_LIVE unless a collection uuid is specified")
-                self.events = events.subscribe(arvados.api(), [["object_uuid", "=", self._manifest_locator]], self.on_message)
+                if not self._has_collection_uuid():
+                    raise errors.ArgumentError("Cannot SYNC_LIVE associated with a collection uuid")
+                self.events = events.subscribe(arvados.api(apiconfig=self._config),
+                                               [["object_uuid", "=", self._manifest_locator]],
+                                               self.on_message)
 
-    @staticmethod
-    def create(name, owner_uuid=None, sync=SYNC_EXPLICIT):
-        c = Collection(sync=sync)
-        c.save_as(name, owner_uuid=owner_uuid, ensure_unique_name=True)
-        return c
 
-    def _root_lock(self):
-        return self.lock
+    def root_collection(self):
+        return self
 
     def sync_mode(self):
         return self._sync
 
-    def on_message(self):
-        self.update()
+    def on_message(self, event):
+        if event.get("object_uuid") == self._manifest_locator:
+            self.update()
 
-    @_synchronized
-    def update(self):
-        n = self._my_api().collections().get(uuid=self._manifest_locator, select=["manifest_text"]).execute()
-        other = import_collection(n["manifest_text"])
-        self.merge(other)
+    @staticmethod
+    def create(name, owner_uuid=None, sync=SYNC_EXPLICIT, apiconfig=None):
+        """Create a new empty Collection with associated collection record."""
+        c = Collection(sync=SYNC_EXPLICIT, apiconfig=apiconfig)
+        c.save_new(name, owner_uuid=owner_uuid, ensure_unique_name=True)
+        if sync == SYNC_LIVE:
+            c.events = events.subscribe(arvados.api(apiconfig=self._config), [["object_uuid", "=", c._manifest_locator]], c.on_message)
+        return c
 
-    @_synchronized
+    @synchronized
+    @retry_method
+    def update(self, other=None, num_retries=None):
+        if other is None:
+            if self._manifest_locator is None:
+                raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
+            n = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
+            other = import_collection(n["manifest_text"])
+        baseline = import_collection(self._manifest_text)
+        self.apply(other.diff(baseline))
+
+    @synchronized
     def _my_api(self):
         if self._api_client is None:
-            self._api_client = arvados.api.SafeApi(self._config)
+            self._api_client = arvados.SafeApi(self._config)
             self._keep_client = self._api_client.keep
         return self._api_client
 
-    @_synchronized
+    @synchronized
     def _my_keep(self):
         if self._keep_client is None:
             if self._api_client is None:
@@ -1054,7 +1221,7 @@ class Collection(SynchronizedCollectionBase):
                 self._keep_client = KeepClient(api=self._api_client)
         return self._keep_client
 
-    @_synchronized
+    @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
             self._block_manager = BlockManager(self._my_keep())
@@ -1118,6 +1285,7 @@ class Collection(SynchronizedCollectionBase):
                     error_via_api,
                     error_via_keep))
         # populate
+        self._baseline_manifest = self._manifest_text
         import_manifest(self._manifest_text, self)
 
         if self._sync == SYNC_READONLY:
@@ -1125,28 +1293,31 @@ class Collection(SynchronizedCollectionBase):
             # forego any further locking.
             self.lock = NoopLock()
 
+    def _has_collection_uuid(self):
+        return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
+
     def __enter__(self):
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
         """Support scoped auto-commit in a with: block"""
-        if self._sync != SYNC_READONLY:
-            self.save(allow_no_locator=True)
+        if self._sync != SYNC_READONLY and self._has_collection_uuid():
+            self.save()
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
-    @_synchronized
+    @synchronized
     def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None):
         if new_config is None:
-            new_config = self.config
-        c = Collection(parent=new_parent, config=new_config, sync=new_sync)
+            new_config = self._config
+        c = CollectionRoot(parent=new_parent, apiconfig=new_config, sync=new_sync)
         if new_sync == SYNC_READONLY:
             c.lock = NoopLock()
         c._items = {}
         self._cloneinto(c)
         return c
 
-    @_synchronized
+    @synchronized
     def api_response(self):
         """
         api_response() -> dict or None
@@ -1157,38 +1328,53 @@ class Collection(SynchronizedCollectionBase):
         """
         return self._api_response
 
-    @_must_be_writable
-    @_synchronized
-    def save(self, allow_no_locator=False):
-        """Commit pending buffer blocks to Keep, write the manifest to Keep, and
-        update the collection record to Keep.
+    @must_be_writable
+    @synchronized
+    @retry_method
+    def save(self, merge=True, num_retries=None):
+        """Commit pending buffer blocks to Keep, merge with remote record (if
+        update=True), write the manifest to Keep, and update the collection
+        record.  Will raise AssertionError if not associated with a collection
+        record on the API server.  If you want to save a manifest to Keep only,
+        see `save_new()`.
+
+        :update:
+          Update and merge remote changes before saving.  Otherwise, any
+          remote changes will be ignored and overwritten.
 
-        :allow_no_locator:
-          If there is no collection uuid associated with this
-          Collection and `allow_no_locator` is False, raise an error.  If True,
-          do not raise an error.
         """
         if self.modified():
-            self._my_block_manager().commit_all()
-            self._my_keep().put(self.manifest_text(strip=True))
-            if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
-                self._api_response = self._my_api().collections().update(
-                    uuid=self._manifest_locator,
-                    body={'manifest_text': self.manifest_text(strip=False)}
-                    ).execute(
-                        num_retries=self.num_retries)
-            elif not allow_no_locator:
+            if not self._has_collection_uuid():
                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
+            self._my_block_manager().commit_all()
+            if merge:
+                self.update()
+            self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
+
+            mt = self.manifest_text(strip=False)
+            self._api_response = self._my_api().collections().update(
+                uuid=self._manifest_locator,
+                body={'manifest_text': mt}
+                ).execute(
+                    num_retries=num_retries)
+            self._manifest_text = mt
             self.set_unmodified()
 
-    @_must_be_writable
-    @_synchronized
-    def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
-        """Save a new collection record.
+    @must_be_writable
+    @synchronized
+    @retry_method
+    def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
+        """Commit pending buffer blocks to Keep, write the manifest to Keep, and create
+        a new collection record (if create_collection_record True).  After
+        creating a new collection record, this Collection object will be
+        associated with the new record for `save()` and SYNC_LIVE updates.
 
         :name:
           The collection name.
 
+        :keep_only:
+          Only save the manifest to keep, do not create a collection record.
+
         :owner_uuid:
           the user, or project uuid that will own this collection.
           If None, defaults to the current user.
@@ -1200,35 +1386,56 @@ class Collection(SynchronizedCollectionBase):
 
         """
         self._my_block_manager().commit_all()
-        self._my_keep().put(self.manifest_text(strip=True))
-        body = {"manifest_text": self.manifest_text(strip=False),
-                "name": name}
-        if owner_uuid:
-            body["owner_uuid"] = owner_uuid
-        self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
+        self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
+        mt = self.manifest_text(strip=False)
+
+        if create_collection_record:
+            if name is None:
+                name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
 
-        if self.events:
-            self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
+            body = {"manifest_text": mt,
+                    "name": name}
+            if owner_uuid:
+                body["owner_uuid"] = owner_uuid
 
-        self._manifest_locator = self._api_response["uuid"]
+            self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
 
-        if self.events:
-            self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
+            if self.events:
+                self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
 
+            self._manifest_locator = self._api_response["uuid"]
+
+            if self.events:
+                self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
+
+        self._manifest_text = mt
         self.set_unmodified()
 
-    @_synchronized
+    @synchronized
     def subscribe(self, callback):
         self.callbacks.append(callback)
 
-    @_synchronized
+    @synchronized
     def unsubscribe(self, callback):
         self.callbacks.remove(callback)
 
-    @_synchronized
-    def notify(self, collection, event, name, item):
+    @synchronized
+    def notify(self, event, collection, name, item):
         for c in self.callbacks:
-            c(collection, event, name, item)
+            c(event, collection, name, item)
+
+def ReadOnlyCollection(*args, **kwargs):
+    kwargs["sync"] = SYNC_READONLY
+    return CollectionRoot(*args, **kwargs)
+
+def WritableCollection(*args, **kwargs):
+    kwargs["sync"] = SYNC_EXPLICIT
+    return CollectionRoot(*args, **kwargs)
+
+def LiveCollection(*args, **kwargs):
+    kwargs["sync"] = SYNC_LIVE
+    return CollectionRoot(*args, **kwargs)
+
 
 class Subcollection(SynchronizedCollectionBase):
     """This is a subdirectory within a collection that doesn't have its own API
@@ -1236,33 +1443,32 @@ class Subcollection(SynchronizedCollectionBase):
 
     def __init__(self, parent):
         super(Subcollection, self).__init__(parent)
-        self.lock = parent._root_lock()
+        self.lock = self.root_collection().lock
 
-    def _root_lock(self):
-        return self.parent._root_lock()
+    def root_collection(self):
+        return self.parent.root_collection()
 
     def sync_mode(self):
-        return self.parent.sync_mode()
+        return self.root_collection().sync_mode()
 
     def _my_api(self):
-        return self.parent._my_api()
+        return self.root_collection()._my_api()
 
     def _my_keep(self):
-        return self.parent._my_keep()
+        return self.root_collection()._my_keep()
 
     def _my_block_manager(self):
-        return self.parent._my_block_manager()
+        return self.root_collection()._my_block_manager()
 
     def _populate(self):
-        self.parent._populate()
+        self.root_collection()._populate()
 
-    def notify(self, collection, event, name, item):
-        self.parent.notify(collection, event, name, item)
+    def notify(self, event, collection, name, item):
+        return self.root_collection().notify(event, collection, name, item)
 
-    @_synchronized
+    @synchronized
     def clone(self, new_parent):
         c = Subcollection(new_parent)
-        c._items = {}
         self._cloneinto(c)
         return c
 
@@ -1298,7 +1504,7 @@ def import_manifest(manifest_text,
             raise ArgumentError("Can only import manifest into an empty collection")
         c = into_collection
     else:
-        c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
+        c = CollectionRoot(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
 
     save_sync = c.sync_mode()
     c._sync = None