Merge branch 'master' into 4823-python-sdk-writable-collection-api
[arvados.git] / sdk / python / arvados / collection.py
index f5ed2f36e194a848d4411ee78b94180636ac462f..20f5c40bfccdf8eb945db95bf575e6f2396782f7 100644 (file)
@@ -8,7 +8,7 @@ import time
 from collections import deque
 from stat import *
 
-from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, _synchronized, _must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
+from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
 from keep import *
 from .stream import StreamReader, normalize_stream, locator_block_size
 from .ranges import Range, LocatorAndRange
@@ -17,6 +17,7 @@ import config
 import errors
 import util
 import events
+from arvados.retry import retry_method
 
 _logger = logging.getLogger('arvados.collection')
 
@@ -647,6 +648,9 @@ DEL = "del"
 MOD = "mod"
 
 class SynchronizedCollectionBase(CollectionBase):
+    """Base class for Collections and Subcollections.  Implements the majority of
+    functionality relating to accessing items in the Collection."""
+
     def __init__(self, parent=None):
         self.parent = parent
         self._modified = True
@@ -661,19 +665,19 @@ class SynchronizedCollectionBase(CollectionBase):
     def _my_block_manager(self):
         raise NotImplementedError()
 
-    def _root_lock(self):
-        raise NotImplementedError()
-
     def _populate(self):
         raise NotImplementedError()
 
     def sync_mode(self):
         raise NotImplementedError()
 
+    def root_collection(self):
+        raise NotImplementedError()
+
     def notify(self, event, collection, name, item):
         raise NotImplementedError()
 
-    @_synchronized
+    @synchronized
     def find(self, path, create=False, create_collection=False):
         """Recursively search the specified file path.  May return either a Collection
         or ArvadosFile.
@@ -768,7 +772,7 @@ class SynchronizedCollectionBase(CollectionBase):
         else:
             return ArvadosFileWriter(f, path, mode, num_retries=self.num_retries)
 
-    @_synchronized
+    @synchronized
     def modified(self):
         """Test if the collection (or any subcollection or file) has been modified
         since it was created."""
@@ -779,60 +783,60 @@ class SynchronizedCollectionBase(CollectionBase):
                 return True
         return False
 
-    @_synchronized
+    @synchronized
     def set_unmodified(self):
         """Recursively clear modified flag"""
         self._modified = False
         for k,v in self._items.items():
             v.set_unmodified()
 
-    @_synchronized
+    @synchronized
     def __iter__(self):
         """Iterate over names of files and collections contained in this collection."""
         return self._items.keys().__iter__()
 
-    @_synchronized
+    @synchronized
     def iterkeys(self):
         """Iterate over names of files and collections directly contained in this collection."""
         return self._items.keys()
 
-    @_synchronized
+    @synchronized
     def __getitem__(self, k):
         """Get a file or collection that is directly contained by this collection.  If
         you want to search a path, use `find()` instead.
         """
         return self._items[k]
 
-    @_synchronized
+    @synchronized
     def __contains__(self, k):
         """If there is a file or collection a directly contained by this collection
         with name "k"."""
         return k in self._items
 
-    @_synchronized
+    @synchronized
     def __len__(self):
         """Get the number of items directly contained in this collection"""
         return len(self._items)
 
-    @_must_be_writable
-    @_synchronized
+    @must_be_writable
+    @synchronized
     def __delitem__(self, p):
         """Delete an item by name which is directly contained by this collection."""
         del self._items[p]
         self._modified = True
         self.notify(DEL, self, p, None)
 
-    @_synchronized
+    @synchronized
     def keys(self):
         """Get a list of names of files and collections directly contained in this collection."""
         return self._items.keys()
 
-    @_synchronized
+    @synchronized
     def values(self):
         """Get a list of files and collection objects directly contained in this collection."""
         return self._items.values()
 
-    @_synchronized
+    @synchronized
     def items(self):
         """Get a list of (name, object) tuples directly contained in this collection."""
         return self._items.items()
@@ -841,8 +845,8 @@ class SynchronizedCollectionBase(CollectionBase):
         """Test if there is a file or collection at "path" """
         return self.find(path) != None
 
-    @_must_be_writable
-    @_synchronized
+    @must_be_writable
+    @synchronized
     def remove(self, path, rm_r=False):
         """Remove the file or subcollection (directory) at `path`.
         :rm_r:
@@ -877,8 +881,8 @@ class SynchronizedCollectionBase(CollectionBase):
     def clone(self):
         raise NotImplementedError()
 
-    @_must_be_writable
-    @_synchronized
+    @must_be_writable
+    @synchronized
     def copy(self, source, target_path, source_collection=None, overwrite=False):
         """Copy a file or subcollection to a new path in this collection.
 
@@ -943,7 +947,7 @@ class SynchronizedCollectionBase(CollectionBase):
         else:
             self.notify(ADD, target_dir, target_name, dup)
 
-    @_synchronized
+    @synchronized
     def manifest_text(self, strip=False, normalize=False):
         """Get the manifest text for this collection, sub collections and files.
 
@@ -966,7 +970,7 @@ class SynchronizedCollectionBase(CollectionBase):
             else:
                 return self._manifest_text
 
-    @_synchronized
+    @synchronized
     def diff(self, end_collection, prefix=".", holding_collection=None):
         """
         Generate list of add/modify/delete actions which, when given to `apply`, will
@@ -974,7 +978,7 @@ class SynchronizedCollectionBase(CollectionBase):
         """
         changes = []
         if holding_collection is None:
-            holding_collection = Collection()
+            holding_collection = CollectionRoot(api_client=self._my_api(), keep_client=self._my_keep(), sync=SYNC_READONLY)
         for k in self:
             if k not in end_collection:
                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
@@ -988,8 +992,8 @@ class SynchronizedCollectionBase(CollectionBase):
                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
         return changes
 
-    @_must_be_writable
-    @_synchronized
+    @must_be_writable
+    @synchronized
     def apply(self, changes):
         """
         Apply changes from `diff`.  If a change conflicts with a local change, it
@@ -1011,13 +1015,13 @@ class SynchronizedCollectionBase(CollectionBase):
                     self.copy(initial, conflictpath)
             elif c[0] == MOD:
                 if local == initial:
-                    # Local matches the "initial" item so assume it hasn't
+                    # Local matches the "initial" item so it has not
                     # changed locally and is safe to update.
                     if isinstance(local, ArvadosFile) and isinstance(c[3], ArvadosFile):
                         # Replace contents of local file with new contents
                         local.replace_contents(c[3])
                     else:
-                        # Overwrite path with new item; this can happen if if
+                        # Overwrite path with new item; this can happen if
                         # path was a file and is now a collection or vice versa
                         self.copy(c[3], path, overwrite=True)
                 else:
@@ -1036,7 +1040,7 @@ class SynchronizedCollectionBase(CollectionBase):
         stripped = self.manifest_text(strip=True)
         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
 
-    @_synchronized
+    @synchronized
     def __eq__(self, other):
         if other is self:
             return True
@@ -1054,32 +1058,63 @@ class SynchronizedCollectionBase(CollectionBase):
     def __ne__(self, other):
         return not self.__eq__(other)
 
-class Collection(SynchronizedCollectionBase):
-    """Store an Arvados collection, consisting of a set of files and
-    sub-collections.
+class CollectionRoot(SynchronizedCollectionBase):
+    """Represents the root of an Arvados Collection, which may be associated with
+    an API server Collection record.
+
+    Brief summary of useful methods:
+
+    :To read an existing file:
+      `c.open("myfile", "r")`
+
+    :To write a new file:
+      `c.open("myfile", "w")`
+
+    :To determine if a file exists:
+      `c.find("myfile") is not None`
+
+    :To copy a file:
+      `c.copy("source", "dest")`
+
+    :To delete a file:
+      `c.remove("myfile")`
+
+    :To save to an existing collection record:
+      `c.save()`
+
+    :To save a new collection record:
+    `c.save_new()`
+
+    :To merge remote changes into this object:
+      `c.update()`
+
+    This class is threadsafe.  The root collection object, all subcollections
+    and files are protected by a single lock (i.e. each access locks the entire
+    collection).
+
     """
 
     def __init__(self, manifest_locator_or_text=None,
                  parent=None,
-                 config=None,
+                 apiconfig=None,
                  api_client=None,
                  keep_client=None,
                  num_retries=None,
                  block_manager=None,
-                 sync=SYNC_READONLY):
+                 sync=None):
         """:manifest_locator_or_text:
           One of Arvados collection UUID, block locator of
           a manifest, raw manifest text, or None (to create an empty collection).
         :parent:
           the parent Collection, may be None.
-        :config:
-          the arvados configuration to get the hostname and api token.
+        :apiconfig:
+          A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
           Prefer this over supplying your own api_client and keep_client (except in testing).
           Will use default config settings if not specified.
         :api_client:
-          The API client object to use for requests.  If not specified, create one using `config`.
+          The API client object to use for requests.  If not specified, create one using `apiconfig`.
         :keep_client:
-          the Keep client to use for requests.  If not specified, create one using `config`.
+          the Keep client to use for requests.  If not specified, create one using `apiconfig`.
         :num_retries:
           the number of retries for API and Keep requests.
         :block_manager:
@@ -1090,21 +1125,30 @@ class Collection(SynchronizedCollectionBase):
             Collection is read only.  No synchronization.  This mode will
             also forego locking, which gives better performance.
           :SYNC_EXPLICIT:
-            Synchronize on explicit request via `update()` or `save()`
+            Collection is writable.  Synchronize on explicit request via `update()` or `save()`
           :SYNC_LIVE:
-            Synchronize with server in response to background websocket events,
-            on block write, or on file close.
+            Collection is writable.  Synchronize with server in response to
+            background websocket events, on block write, or on file close.
 
         """
-        super(Collection, self).__init__(parent)
+        super(CollectionRoot, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
-        self._config = config
+
+        if apiconfig:
+            self._config = apiconfig
+        else:
+            self._config = config.settings()
+
         self.num_retries = num_retries
         self._manifest_locator = None
         self._manifest_text = None
         self._api_response = None
+
+        if sync is None:
+            raise errors.ArgumentError("Must specify sync mode")
+
         self._sync = sync
         self.lock = threading.RLock()
         self.callbacks = []
@@ -1124,41 +1168,51 @@ class Collection(SynchronizedCollectionBase):
             self._populate()
 
             if self._sync == SYNC_LIVE:
-                if not self._manifest_locator or not re.match(util.collection_uuid_pattern, self._manifest_locator):
-                    raise errors.ArgumentError("Cannot SYNC_LIVE unless a collection uuid is specified")
-                self.events = events.subscribe(arvados.api(), [["object_uuid", "=", self._manifest_locator]], self.on_message)
+                if not self._has_collection_uuid():
+                    raise errors.ArgumentError("Cannot SYNC_LIVE associated with a collection uuid")
+                self.events = events.subscribe(arvados.api(apiconfig=self._config),
+                                               [["object_uuid", "=", self._manifest_locator]],
+                                               self.on_message)
 
-    @staticmethod
-    def create(name, owner_uuid=None, sync=SYNC_EXPLICIT):
-        c = Collection(sync=sync)
-        c.save_as(name, owner_uuid=owner_uuid, ensure_unique_name=True)
-        return c
 
-    def _root_lock(self):
-        return self.lock
+    def root_collection(self):
+        return self
 
     def sync_mode(self):
         return self._sync
 
-    def on_message(self):
-        self.update()
+    def on_message(self, event):
+        if event.get("object_uuid") == self._manifest_locator:
+            self.update()
+
+    @staticmethod
+    def create(name, owner_uuid=None, sync=SYNC_EXPLICIT, apiconfig=None):
+        """Create a new empty Collection with associated collection record."""
+        c = Collection(sync=SYNC_EXPLICIT, apiconfig=apiconfig)
+        c.save_new(name, owner_uuid=owner_uuid, ensure_unique_name=True)
+        if sync == SYNC_LIVE:
+            c.events = events.subscribe(arvados.api(apiconfig=self._config), [["object_uuid", "=", c._manifest_locator]], c.on_message)
+        return c
 
-    @_synchronized
-    def update(self, other=None):
+    @synchronized
+    @retry_method
+    def update(self, other=None, num_retries=None):
         if other is None:
-            n = self._my_api().collections().get(uuid=self._manifest_locator).execute()
+            if self._manifest_locator is None:
+                raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
+            n = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
             other = import_collection(n["manifest_text"])
         baseline = import_collection(self._manifest_text)
         self.apply(other.diff(baseline))
 
-    @_synchronized
+    @synchronized
     def _my_api(self):
         if self._api_client is None:
             self._api_client = arvados.SafeApi(self._config)
             self._keep_client = self._api_client.keep
         return self._api_client
 
-    @_synchronized
+    @synchronized
     def _my_keep(self):
         if self._keep_client is None:
             if self._api_client is None:
@@ -1167,7 +1221,7 @@ class Collection(SynchronizedCollectionBase):
                 self._keep_client = KeepClient(api=self._api_client)
         return self._keep_client
 
-    @_synchronized
+    @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
             self._block_manager = BlockManager(self._my_keep())
@@ -1239,28 +1293,31 @@ class Collection(SynchronizedCollectionBase):
             # forego any further locking.
             self.lock = NoopLock()
 
+    def _has_collection_uuid(self):
+        return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
+
     def __enter__(self):
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
         """Support scoped auto-commit in a with: block"""
-        if self._sync != SYNC_READONLY:
-            self.save(allow_no_locator=True)
+        if self._sync != SYNC_READONLY and self._has_collection_uuid():
+            self.save()
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
-    @_synchronized
+    @synchronized
     def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None):
         if new_config is None:
             new_config = self._config
-        c = Collection(parent=new_parent, config=new_config, sync=new_sync)
+        c = CollectionRoot(parent=new_parent, apiconfig=new_config, sync=new_sync)
         if new_sync == SYNC_READONLY:
             c.lock = NoopLock()
         c._items = {}
         self._cloneinto(c)
         return c
 
-    @_synchronized
+    @synchronized
     def api_response(self):
         """
         api_response() -> dict or None
@@ -1271,38 +1328,53 @@ class Collection(SynchronizedCollectionBase):
         """
         return self._api_response
 
-    @_must_be_writable
-    @_synchronized
-    def save(self, allow_no_locator=False):
-        """Commit pending buffer blocks to Keep, write the manifest to Keep, and
-        update the collection record to Keep.
+    @must_be_writable
+    @synchronized
+    @retry_method
+    def save(self, merge=True, num_retries=None):
+        """Commit pending buffer blocks to Keep, merge with remote record (if
+        update=True), write the manifest to Keep, and update the collection
+        record.  Will raise AssertionError if not associated with a collection
+        record on the API server.  If you want to save a manifest to Keep only,
+        see `save_new()`.
+
+        :update:
+          Update and merge remote changes before saving.  Otherwise, any
+          remote changes will be ignored and overwritten.
 
-        :allow_no_locator:
-          If there is no collection uuid associated with this
-          Collection and `allow_no_locator` is False, raise an error.  If True,
-          do not raise an error.
         """
         if self.modified():
-            self._my_block_manager().commit_all()
-            self._my_keep().put(self.manifest_text(strip=True))
-            if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
-                self._api_response = self._my_api().collections().update(
-                    uuid=self._manifest_locator,
-                    body={'manifest_text': self.manifest_text(strip=False)}
-                    ).execute(
-                        num_retries=self.num_retries)
-            elif not allow_no_locator:
+            if not self._has_collection_uuid():
                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
+            self._my_block_manager().commit_all()
+            if merge:
+                self.update()
+            self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
+
+            mt = self.manifest_text(strip=False)
+            self._api_response = self._my_api().collections().update(
+                uuid=self._manifest_locator,
+                body={'manifest_text': mt}
+                ).execute(
+                    num_retries=num_retries)
+            self._manifest_text = mt
             self.set_unmodified()
 
-    @_must_be_writable
-    @_synchronized
-    def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
-        """Save a new collection record.
+    @must_be_writable
+    @synchronized
+    @retry_method
+    def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
+        """Commit pending buffer blocks to Keep, write the manifest to Keep, and create
+        a new collection record (if create_collection_record True).  After
+        creating a new collection record, this Collection object will be
+        associated with the new record for `save()` and SYNC_LIVE updates.
 
         :name:
           The collection name.
 
+        :keep_only:
+          Only save the manifest to keep, do not create a collection record.
+
         :owner_uuid:
           the user, or project uuid that will own this collection.
           If None, defaults to the current user.
@@ -1314,69 +1386,89 @@ class Collection(SynchronizedCollectionBase):
 
         """
         self._my_block_manager().commit_all()
-        self._my_keep().put(self.manifest_text(strip=True))
-        body = {"manifest_text": self.manifest_text(strip=False),
-                "name": name}
-        if owner_uuid:
-            body["owner_uuid"] = owner_uuid
-        self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
+        self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
+        mt = self.manifest_text(strip=False)
+
+        if create_collection_record:
+            if name is None:
+                name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
+
+            body = {"manifest_text": mt,
+                    "name": name}
+            if owner_uuid:
+                body["owner_uuid"] = owner_uuid
+
+            self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
 
-        if self.events:
-            self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
+            if self.events:
+                self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
 
-        self._manifest_locator = self._api_response["uuid"]
+            self._manifest_locator = self._api_response["uuid"]
 
-        if self.events:
-            self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
+            if self.events:
+                self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
 
+        self._manifest_text = mt
         self.set_unmodified()
 
-    @_synchronized
+    @synchronized
     def subscribe(self, callback):
         self.callbacks.append(callback)
 
-    @_synchronized
+    @synchronized
     def unsubscribe(self, callback):
         self.callbacks.remove(callback)
 
-    @_synchronized
+    @synchronized
     def notify(self, event, collection, name, item):
         for c in self.callbacks:
             c(event, collection, name, item)
 
+def ReadOnlyCollection(*args, **kwargs):
+    kwargs["sync"] = SYNC_READONLY
+    return CollectionRoot(*args, **kwargs)
+
+def WritableCollection(*args, **kwargs):
+    kwargs["sync"] = SYNC_EXPLICIT
+    return CollectionRoot(*args, **kwargs)
+
+def LiveCollection(*args, **kwargs):
+    kwargs["sync"] = SYNC_LIVE
+    return CollectionRoot(*args, **kwargs)
+
+
 class Subcollection(SynchronizedCollectionBase):
     """This is a subdirectory within a collection that doesn't have its own API
     server record.  It falls under the umbrella of the root collection."""
 
     def __init__(self, parent):
         super(Subcollection, self).__init__(parent)
-        self.lock = parent._root_lock()
+        self.lock = self.root_collection().lock
 
-    def _root_lock(self):
-        return self.parent._root_lock()
+    def root_collection(self):
+        return self.parent.root_collection()
 
     def sync_mode(self):
-        return self.parent.sync_mode()
+        return self.root_collection().sync_mode()
 
     def _my_api(self):
-        return self.parent._my_api()
+        return self.root_collection()._my_api()
 
     def _my_keep(self):
-        return self.parent._my_keep()
+        return self.root_collection()._my_keep()
 
     def _my_block_manager(self):
-        return self.parent._my_block_manager()
+        return self.root_collection()._my_block_manager()
 
     def _populate(self):
-        self.parent._populate()
+        self.root_collection()._populate()
 
     def notify(self, event, collection, name, item):
-        self.parent.notify(event, collection, name, item)
+        return self.root_collection().notify(event, collection, name, item)
 
-    @_synchronized
+    @synchronized
     def clone(self, new_parent):
         c = Subcollection(new_parent)
-        c._items = {}
         self._cloneinto(c)
         return c
 
@@ -1412,7 +1504,7 @@ def import_manifest(manifest_text,
             raise ArgumentError("Can only import manifest into an empty collection")
         c = into_collection
     else:
-        c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
+        c = CollectionRoot(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
 
     save_sync = c.sync_mode()
     c._sync = None