Merge branch '6152-compute-node-no-arv-wip'
[arvados.git] / sdk / python / arvados / collection.py
index 12fa9ae30d5bcd7bb4a539cc947bca881ab59302..30828732d8d4908ca0922bc780c11d2a6943578e 100644 (file)
@@ -470,13 +470,14 @@ class ResumableCollectionWriter(CollectionWriter):
                 "resumable writer can't accept unsourced data")
         return super(ResumableCollectionWriter, self).write(data)
 
                 "resumable writer can't accept unsourced data")
         return super(ResumableCollectionWriter, self).write(data)
 
+
 ADD = "add"
 DEL = "del"
 MOD = "mod"
 FILE = "file"
 COLLECTION = "collection"
 
 ADD = "add"
 DEL = "del"
 MOD = "mod"
 FILE = "file"
 COLLECTION = "collection"
 
-class SynchronizedCollectionBase(CollectionBase):
+class RichCollectionBase(CollectionBase):
     """Base class for Collections and Subcollections.
 
     Implements the majority of functionality relating to accessing items in the
     """Base class for Collections and Subcollections.
 
     Implements the majority of functionality relating to accessing items in the
@@ -550,7 +551,7 @@ class SynchronizedCollectionBase(CollectionBase):
                     self._items[pathcomponents[0]] = item
                     self._modified = True
                     self.notify(ADD, self, pathcomponents[0], item)
                     self._items[pathcomponents[0]] = item
                     self._modified = True
                     self.notify(ADD, self, pathcomponents[0], item)
-                if isinstance(item, SynchronizedCollectionBase):
+                if isinstance(item, RichCollectionBase):
                     return item.find_or_create(pathcomponents[1], create_type)
                 else:
                     raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
                     return item.find_or_create(pathcomponents[1], create_type)
                 else:
                     raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
@@ -573,7 +574,7 @@ class SynchronizedCollectionBase(CollectionBase):
         if len(pathcomponents) == 1:
             return item
         else:
         if len(pathcomponents) == 1:
             return item
         else:
-            if isinstance(item, SynchronizedCollectionBase):
+            if isinstance(item, RichCollectionBase):
                 if pathcomponents[1]:
                     return item.find(pathcomponents[1])
                 else:
                 if pathcomponents[1]:
                     return item.find(pathcomponents[1])
                 else:
@@ -702,7 +703,7 @@ class SynchronizedCollectionBase(CollectionBase):
 
     def exists(self, path):
         """Test if there is a file or collection at `path`."""
 
     def exists(self, path):
         """Test if there is a file or collection at `path`."""
-        return self.find(path) != None
+        return self.find(path) is not None
 
     @must_be_writable
     @synchronized
 
     @must_be_writable
     @synchronized
@@ -721,7 +722,7 @@ class SynchronizedCollectionBase(CollectionBase):
         if item is None:
             raise IOError((errno.ENOENT, "File not found"))
         if len(pathcomponents) == 1:
         if item is None:
             raise IOError((errno.ENOENT, "File not found"))
         if len(pathcomponents) == 1:
-            if isinstance(self._items[pathcomponents[0]], SynchronizedCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
+            if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
                 raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
             deleteditem = self._items[pathcomponents[0]]
             del self._items[pathcomponents[0]]
                 raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
             deleteditem = self._items[pathcomponents[0]]
             del self._items[pathcomponents[0]]
@@ -737,13 +738,47 @@ class SynchronizedCollectionBase(CollectionBase):
     def clone(self):
         raise NotImplementedError()
 
     def clone(self):
         raise NotImplementedError()
 
+    @must_be_writable
+    @synchronized
+    def add(self, source_obj, target_name, overwrite=False):
+        """Copy a file or subcollection to this collection.
+
+        :source_obj:
+          An ArvadosFile, or Subcollection object
+
+        :target_name:
+          Destination item name.  If the target name already exists and is a
+          file, this will raise an error unless you specify `overwrite=True`.
+
+        :overwrite:
+          Whether to overwrite target file if it already exists.
+
+        """
+
+        if target_name in self and not overwrite:
+            raise IOError((errno.EEXIST, "File already exists"))
+
+        modified_from = None
+        if target_name in self:
+            modified_from = self[target_name]
+
+        # Actually make the copy.
+        dup = source_obj.clone(self)
+        self._items[target_name] = dup
+        self._modified = True
+
+        if modified_from:
+            self.notify(MOD, self, target_name, (modified_from, dup))
+        else:
+            self.notify(ADD, self, target_name, dup)
+
     @must_be_writable
     @synchronized
     def copy(self, source, target_path, source_collection=None, overwrite=False):
         """Copy a file or subcollection to a new path in this collection.
 
         :source:
     @must_be_writable
     @synchronized
     def copy(self, source, target_path, source_collection=None, overwrite=False):
         """Copy a file or subcollection to a new path in this collection.
 
         :source:
-          An ArvadosFile, Subcollection, or string with a path to source file or subcollection
+          A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
 
         :target_path:
           Destination file or path.  If the target path already exists and is a
 
         :target_path:
           Destination file or path.  If the target path already exists and is a
@@ -781,33 +816,54 @@ class SynchronizedCollectionBase(CollectionBase):
 
         target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
 
 
         target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
 
-        if target_name in target_dir:
-            if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sourcecomponents:
-                target_dir = target_dir[target_name]
-                target_name = sourcecomponents[-1]
-            elif not overwrite:
-                raise IOError((errno.EEXIST, "File already exists"))
+        if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
+            target_dir = target_dir[target_name]
+            target_name = sourcecomponents[-1]
 
 
-        modified_from = None
-        if target_name in target_dir:
-            modified_from = target_dir[target_name]
+        target_dir.add(source_obj, target_name, overwrite)
 
 
-        # Actually make the copy.
-        dup = source_obj.clone(target_dir)
-        target_dir._items[target_name] = dup
-        target_dir._modified = True
+    def portable_manifest_text(self, stream_name="."):
+        """Get the manifest text for this collection, sub collections and files.
 
 
-        if modified_from:
-            self.notify(MOD, target_dir, target_name, (modified_from, dup))
-        else:
-            self.notify(ADD, target_dir, target_name, dup)
+        This method does not flush outstanding blocks to Keep.  It will return
+        a normalized manifest with access tokens stripped.
+
+        :stream_name:
+          Name to use for this stream (directory)
+
+        """
+        return self._get_manifest_text(stream_name, True, True)
 
 
-    @synchronized
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
         """Get the manifest text for this collection, sub collections and files.
 
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
         """Get the manifest text for this collection, sub collections and files.
 
+        This method will flush outstanding blocks to Keep.  By default, it will
+        not normalize an unmodified manifest or strip access tokens.
+
+        :stream_name:
+          Name to use for this stream (directory)
+
+        :strip:
+          If True, remove signing tokens from block locators if present.
+          If False (default), block locators are left unchanged.
+
+        :normalize:
+          If True, always export the manifest text in normalized form
+          even if the Collection is not modified.  If False (default) and the collection
+          is not modified, return the original manifest text even if it is not
+          in normalized form.
+
+        """
+
+        self._my_block_manager().commit_all()
+        return self._get_manifest_text(stream_name, strip, normalize)
+
+    @synchronized
+    def _get_manifest_text(self, stream_name, strip, normalize):
+        """Get the manifest text for this collection, sub collections and files.
+
         :stream_name:
         :stream_name:
-          Name of the stream (directory)
+          Name to use for this stream (directory)
 
         :strip:
           If True, remove signing tokens from block locators if present.
 
         :strip:
           If True, remove signing tokens from block locators if present.
@@ -822,13 +878,12 @@ class SynchronizedCollectionBase(CollectionBase):
         """
 
         if self.modified() or self._manifest_text is None or normalize:
         """
 
         if self.modified() or self._manifest_text is None or normalize:
-            item  = self
             stream = {}
             buf = []
             stream = {}
             buf = []
-            sorted_keys = sorted(item.keys())
-            for filename in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
+            sorted_keys = sorted(self.keys())
+            for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
                 # Create a stream per file `k`
                 # Create a stream per file `k`
-                arvfile = item[filename]
+                arvfile = self[filename]
                 filestream = []
                 for segment in arvfile.segments():
                     loc = segment.locator
                 filestream = []
                 for segment in arvfile.segments():
                     loc = segment.locator
@@ -841,8 +896,8 @@ class SynchronizedCollectionBase(CollectionBase):
                 stream[filename] = filestream
             if stream:
                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
                 stream[filename] = filestream
             if stream:
                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
-            for dirname in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
-                buf.append(item[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip))
+            for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
+                buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
             return "".join(buf)
         else:
             if strip:
             return "".join(buf)
         else:
             if strip:
@@ -922,14 +977,14 @@ class SynchronizedCollectionBase(CollectionBase):
 
     def portable_data_hash(self):
         """Get the portable data hash for this collection's manifest."""
 
     def portable_data_hash(self):
         """Get the portable data hash for this collection's manifest."""
-        stripped = self.manifest_text(strip=True)
+        stripped = self.portable_manifest_text()
         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
 
     @synchronized
     def __eq__(self, other):
         if other is self:
             return True
         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
 
     @synchronized
     def __eq__(self, other):
         if other is self:
             return True
-        if not isinstance(other, SynchronizedCollectionBase):
+        if not isinstance(other, RichCollectionBase):
             return False
         if len(self._items) != len(other):
             return False
             return False
         if len(self._items) != len(other):
             return False
@@ -944,7 +999,7 @@ class SynchronizedCollectionBase(CollectionBase):
         return not self.__eq__(other)
 
 
         return not self.__eq__(other)
 
 
-class Collection(SynchronizedCollectionBase):
+class Collection(RichCollectionBase):
     """Represents the root of an Arvados Collection.
 
     This class is threadsafe.  The root collection object, all subcollections
     """Represents the root of an Arvados Collection.
 
     This class is threadsafe.  The root collection object, all subcollections
@@ -1166,6 +1221,20 @@ class Collection(SynchronizedCollectionBase):
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
+    @synchronized
+    def manifest_locator(self):
+        """Get the manifest locator, if any.
+
+        The manifest locator will be set when the collection is loaded from an
+        API server record or the portable data hash of a manifest.
+
+        The manifest locator will be None if the collection is newly created or
+        was created directly from manifest text.  The method `save_new()` will
+        assign a manifest locator.
+
+        """
+        return self._manifest_locator
+
     @synchronized
     def clone(self, new_parent=None, readonly=False, new_config=None):
         if new_config is None:
     @synchronized
     def clone(self, new_parent=None, readonly=False, new_config=None):
         if new_config is None:
@@ -1189,21 +1258,21 @@ class Collection(SynchronizedCollectionBase):
         return self._api_response
 
     def find_or_create(self, path, create_type):
         return self._api_response
 
     def find_or_create(self, path, create_type):
-        """See `SynchronizedCollectionBase.find_or_create`"""
+        """See `RichCollectionBase.find_or_create`"""
         if path == ".":
             return self
         else:
             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
 
     def find(self, path):
         if path == ".":
             return self
         else:
             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
 
     def find(self, path):
-        """See `SynchronizedCollectionBase.find`"""
+        """See `RichCollectionBase.find`"""
         if path == ".":
             return self
         else:
             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
 
     def remove(self, path, recursive=False):
         if path == ".":
             return self
         else:
             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
 
     def remove(self, path, recursive=False):
-        """See `SynchronizedCollectionBase.remove`"""
+        """See `RichCollectionBase.remove`"""
         if path == ".":
             raise errors.ArgumentError("Cannot remove '.'")
         else:
         if path == ".":
             raise errors.ArgumentError("Cannot remove '.'")
         else:
@@ -1216,25 +1285,29 @@ class Collection(SynchronizedCollectionBase):
         """Save collection to an existing collection record.
 
         Commit pending buffer blocks to Keep, merge with remote record (if
         """Save collection to an existing collection record.
 
         Commit pending buffer blocks to Keep, merge with remote record (if
-        update=True), write the manifest to Keep, and update the collection
-        record.
+        merge=True, the default), and update the collection record.  Returns
+        the current manifest text.
 
         Will raise AssertionError if not associated with a collection record on
         the API server.  If you want to save a manifest to Keep only, see
         `save_new()`.
 
 
         Will raise AssertionError if not associated with a collection record on
         the API server.  If you want to save a manifest to Keep only, see
         `save_new()`.
 
-        :update:
+        :merge:
           Update and merge remote changes before saving.  Otherwise, any
           remote changes will be ignored and overwritten.
 
           Update and merge remote changes before saving.  Otherwise, any
           remote changes will be ignored and overwritten.
 
+        :num_retries:
+          Retry count on API calls (if None,  use the collection default)
+
         """
         if self.modified():
             if not self._has_collection_uuid():
                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_new() for new collections.")
         """
         if self.modified():
             if not self._has_collection_uuid():
                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_new() for new collections.")
+
             self._my_block_manager().commit_all()
             self._my_block_manager().commit_all()
+
             if merge:
                 self.update()
             if merge:
                 self.update()
-            self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
 
             text = self.manifest_text(strip=False)
             self._api_response = self._my_api().collections().update(
 
             text = self.manifest_text(strip=False)
             self._api_response = self._my_api().collections().update(
@@ -1245,23 +1318,30 @@ class Collection(SynchronizedCollectionBase):
             self._manifest_text = self._api_response["manifest_text"]
             self.set_unmodified()
 
             self._manifest_text = self._api_response["manifest_text"]
             self.set_unmodified()
 
+        return self._manifest_text
+
 
     @must_be_writable
     @synchronized
     @retry_method
 
     @must_be_writable
     @synchronized
     @retry_method
-    def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
+    def save_new(self, name=None,
+                 create_collection_record=True,
+                 owner_uuid=None,
+                 ensure_unique_name=False,
+                 num_retries=None):
         """Save collection to a new collection record.
 
         """Save collection to a new collection record.
 
-        Commit pending buffer blocks to Keep, write the manifest to Keep, and
-        create a new collection record (if create_collection_record True).
-        After creating a new collection record, this Collection object will be
-        associated with the new record used by `save()`.
+        Commit pending buffer blocks to Keep and, when create_collection_record
+        is True (default), create a new collection record.  After creating a
+        new collection record, this Collection object will be associated with
+        the new record used by `save()`.  Returns the current manifest text.
 
         :name:
           The collection name.
 
 
         :name:
           The collection name.
 
-        :keep_only:
-          Only save the manifest to keep, do not create a collection record.
+        :create_collection_record:
+           If True, create a collection record on the API server.
+           If False, only commit blocks to Keep and return the manifest text.
 
         :owner_uuid:
           the user, or project uuid that will own this collection.
 
         :owner_uuid:
           the user, or project uuid that will own this collection.
@@ -1272,9 +1352,11 @@ class Collection(SynchronizedCollectionBase):
           if it conflicts with a collection with the same name and owner.  If
           False, a name conflict will result in an error.
 
           if it conflicts with a collection with the same name and owner.  If
           False, a name conflict will result in an error.
 
+        :num_retries:
+          Retry count on API calls (if None,  use the collection default)
+
         """
         self._my_block_manager().commit_all()
         """
         self._my_block_manager().commit_all()
-        self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
         text = self.manifest_text(strip=False)
 
         if create_collection_record:
         text = self.manifest_text(strip=False)
 
         if create_collection_record:
@@ -1291,8 +1373,10 @@ class Collection(SynchronizedCollectionBase):
 
             self._manifest_locator = self._api_response["uuid"]
 
 
             self._manifest_locator = self._api_response["uuid"]
 
-        self._manifest_text = text
-        self.set_unmodified()
+            self._manifest_text = text
+            self.set_unmodified()
+
+        return text
 
     @synchronized
     def subscribe(self, callback):
 
     @synchronized
     def subscribe(self, callback):
@@ -1342,7 +1426,7 @@ class Collection(SynchronizedCollectionBase):
                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
                 if block_locator:
                     blocksize = long(block_locator.group(1))
                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
                 if block_locator:
                     blocksize = long(block_locator.group(1))
-                    blocks.append(Range(tok, streamoffset, blocksize))
+                    blocks.append(Range(tok, streamoffset, blocksize, 0))
                     streamoffset += blocksize
                 else:
                     state = SEGMENTS
                     streamoffset += blocksize
                 else:
                     state = SEGMENTS
@@ -1370,7 +1454,7 @@ class Collection(SynchronizedCollectionBase):
         self.set_unmodified()
 
 
         self.set_unmodified()
 
 
-class Subcollection(SynchronizedCollectionBase):
+class Subcollection(RichCollectionBase):
     """This is a subdirectory within a collection that doesn't have its own API
     server record.
 
     """This is a subdirectory within a collection that doesn't have its own API
     server record.