X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0c9dd4b566696e5de7bbe82d000997acc978dcfa..456967e1991ea8adc30038b60f5c34703b47b694:/sdk/python/arvados/collection.py diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 12fa9ae30d..30828732d8 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -470,13 +470,14 @@ class ResumableCollectionWriter(CollectionWriter): "resumable writer can't accept unsourced data") return super(ResumableCollectionWriter, self).write(data) + ADD = "add" DEL = "del" MOD = "mod" FILE = "file" COLLECTION = "collection" -class SynchronizedCollectionBase(CollectionBase): +class RichCollectionBase(CollectionBase): """Base class for Collections and Subcollections. Implements the majority of functionality relating to accessing items in the @@ -550,7 +551,7 @@ class SynchronizedCollectionBase(CollectionBase): self._items[pathcomponents[0]] = item self._modified = True self.notify(ADD, self, pathcomponents[0], item) - if isinstance(item, SynchronizedCollectionBase): + if isinstance(item, RichCollectionBase): return item.find_or_create(pathcomponents[1], create_type) else: raise IOError((errno.ENOTDIR, "Interior path components must be subcollection")) @@ -573,7 +574,7 @@ class SynchronizedCollectionBase(CollectionBase): if len(pathcomponents) == 1: return item else: - if isinstance(item, SynchronizedCollectionBase): + if isinstance(item, RichCollectionBase): if pathcomponents[1]: return item.find(pathcomponents[1]) else: @@ -702,7 +703,7 @@ class SynchronizedCollectionBase(CollectionBase): def exists(self, path): """Test if there is a file or collection at `path`.""" - return self.find(path) != None + return self.find(path) is not None @must_be_writable @synchronized @@ -721,7 +722,7 @@ class SynchronizedCollectionBase(CollectionBase): if item is None: raise IOError((errno.ENOENT, "File not found")) if len(pathcomponents) == 1: - if isinstance(self._items[pathcomponents[0]], SynchronizedCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive: + if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive: raise IOError((errno.ENOTEMPTY, "Subcollection not empty")) deleteditem = self._items[pathcomponents[0]] del self._items[pathcomponents[0]] @@ -737,13 +738,47 @@ class SynchronizedCollectionBase(CollectionBase): def clone(self): raise NotImplementedError() + @must_be_writable + @synchronized + def add(self, source_obj, target_name, overwrite=False): + """Copy a file or subcollection to this collection. + + :source_obj: + An ArvadosFile, or Subcollection object + + :target_name: + Destination item name. If the target name already exists and is a + file, this will raise an error unless you specify `overwrite=True`. + + :overwrite: + Whether to overwrite target file if it already exists. + + """ + + if target_name in self and not overwrite: + raise IOError((errno.EEXIST, "File already exists")) + + modified_from = None + if target_name in self: + modified_from = self[target_name] + + # Actually make the copy. + dup = source_obj.clone(self) + self._items[target_name] = dup + self._modified = True + + if modified_from: + self.notify(MOD, self, target_name, (modified_from, dup)) + else: + self.notify(ADD, self, target_name, dup) + @must_be_writable @synchronized def copy(self, source, target_path, source_collection=None, overwrite=False): """Copy a file or subcollection to a new path in this collection. :source: - An ArvadosFile, Subcollection, or string with a path to source file or subcollection + A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object. :target_path: Destination file or path. If the target path already exists and is a @@ -781,33 +816,54 @@ class SynchronizedCollectionBase(CollectionBase): target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION) - if target_name in target_dir: - if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sourcecomponents: - target_dir = target_dir[target_name] - target_name = sourcecomponents[-1] - elif not overwrite: - raise IOError((errno.EEXIST, "File already exists")) + if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents: + target_dir = target_dir[target_name] + target_name = sourcecomponents[-1] - modified_from = None - if target_name in target_dir: - modified_from = target_dir[target_name] + target_dir.add(source_obj, target_name, overwrite) - # Actually make the copy. - dup = source_obj.clone(target_dir) - target_dir._items[target_name] = dup - target_dir._modified = True + def portable_manifest_text(self, stream_name="."): + """Get the manifest text for this collection, sub collections and files. - if modified_from: - self.notify(MOD, target_dir, target_name, (modified_from, dup)) - else: - self.notify(ADD, target_dir, target_name, dup) + This method does not flush outstanding blocks to Keep. It will return + a normalized manifest with access tokens stripped. + + :stream_name: + Name to use for this stream (directory) + + """ + return self._get_manifest_text(stream_name, True, True) - @synchronized def manifest_text(self, stream_name=".", strip=False, normalize=False): """Get the manifest text for this collection, sub collections and files. + This method will flush outstanding blocks to Keep. By default, it will + not normalize an unmodified manifest or strip access tokens. + + :stream_name: + Name to use for this stream (directory) + + :strip: + If True, remove signing tokens from block locators if present. + If False (default), block locators are left unchanged. + + :normalize: + If True, always export the manifest text in normalized form + even if the Collection is not modified. If False (default) and the collection + is not modified, return the original manifest text even if it is not + in normalized form. + + """ + + self._my_block_manager().commit_all() + return self._get_manifest_text(stream_name, strip, normalize) + + @synchronized + def _get_manifest_text(self, stream_name, strip, normalize): + """Get the manifest text for this collection, sub collections and files. + :stream_name: - Name of the stream (directory) + Name to use for this stream (directory) :strip: If True, remove signing tokens from block locators if present. @@ -822,13 +878,12 @@ class SynchronizedCollectionBase(CollectionBase): """ if self.modified() or self._manifest_text is None or normalize: - item = self stream = {} buf = [] - sorted_keys = sorted(item.keys()) - for filename in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]: + sorted_keys = sorted(self.keys()) + for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]: # Create a stream per file `k` - arvfile = item[filename] + arvfile = self[filename] filestream = [] for segment in arvfile.segments(): loc = segment.locator @@ -841,8 +896,8 @@ class SynchronizedCollectionBase(CollectionBase): stream[filename] = filestream if stream: buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n") - for dirname in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]: - buf.append(item[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip)) + for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]: + buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True)) return "".join(buf) else: if strip: @@ -922,14 +977,14 @@ class SynchronizedCollectionBase(CollectionBase): def portable_data_hash(self): """Get the portable data hash for this collection's manifest.""" - stripped = self.manifest_text(strip=True) + stripped = self.portable_manifest_text() return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped)) @synchronized def __eq__(self, other): if other is self: return True - if not isinstance(other, SynchronizedCollectionBase): + if not isinstance(other, RichCollectionBase): return False if len(self._items) != len(other): return False @@ -944,7 +999,7 @@ class SynchronizedCollectionBase(CollectionBase): return not self.__eq__(other) -class Collection(SynchronizedCollectionBase): +class Collection(RichCollectionBase): """Represents the root of an Arvados Collection. This class is threadsafe. The root collection object, all subcollections @@ -1166,6 +1221,20 @@ class Collection(SynchronizedCollectionBase): if self._block_manager is not None: self._block_manager.stop_threads() + @synchronized + def manifest_locator(self): + """Get the manifest locator, if any. + + The manifest locator will be set when the collection is loaded from an + API server record or the portable data hash of a manifest. + + The manifest locator will be None if the collection is newly created or + was created directly from manifest text. The method `save_new()` will + assign a manifest locator. + + """ + return self._manifest_locator + @synchronized def clone(self, new_parent=None, readonly=False, new_config=None): if new_config is None: @@ -1189,21 +1258,21 @@ class Collection(SynchronizedCollectionBase): return self._api_response def find_or_create(self, path, create_type): - """See `SynchronizedCollectionBase.find_or_create`""" + """See `RichCollectionBase.find_or_create`""" if path == ".": return self else: return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type) def find(self, path): - """See `SynchronizedCollectionBase.find`""" + """See `RichCollectionBase.find`""" if path == ".": return self else: return super(Collection, self).find(path[2:] if path.startswith("./") else path) def remove(self, path, recursive=False): - """See `SynchronizedCollectionBase.remove`""" + """See `RichCollectionBase.remove`""" if path == ".": raise errors.ArgumentError("Cannot remove '.'") else: @@ -1216,25 +1285,29 @@ class Collection(SynchronizedCollectionBase): """Save collection to an existing collection record. Commit pending buffer blocks to Keep, merge with remote record (if - update=True), write the manifest to Keep, and update the collection - record. + merge=True, the default), and update the collection record. Returns + the current manifest text. Will raise AssertionError if not associated with a collection record on the API server. If you want to save a manifest to Keep only, see `save_new()`. - :update: + :merge: Update and merge remote changes before saving. Otherwise, any remote changes will be ignored and overwritten. + :num_retries: + Retry count on API calls (if None, use the collection default) + """ if self.modified(): if not self._has_collection_uuid(): raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_new() for new collections.") + self._my_block_manager().commit_all() + if merge: self.update() - self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries) text = self.manifest_text(strip=False) self._api_response = self._my_api().collections().update( @@ -1245,23 +1318,30 @@ class Collection(SynchronizedCollectionBase): self._manifest_text = self._api_response["manifest_text"] self.set_unmodified() + return self._manifest_text + @must_be_writable @synchronized @retry_method - def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None): + def save_new(self, name=None, + create_collection_record=True, + owner_uuid=None, + ensure_unique_name=False, + num_retries=None): """Save collection to a new collection record. - Commit pending buffer blocks to Keep, write the manifest to Keep, and - create a new collection record (if create_collection_record True). - After creating a new collection record, this Collection object will be - associated with the new record used by `save()`. + Commit pending buffer blocks to Keep and, when create_collection_record + is True (default), create a new collection record. After creating a + new collection record, this Collection object will be associated with + the new record used by `save()`. Returns the current manifest text. :name: The collection name. - :keep_only: - Only save the manifest to keep, do not create a collection record. + :create_collection_record: + If True, create a collection record on the API server. + If False, only commit blocks to Keep and return the manifest text. :owner_uuid: the user, or project uuid that will own this collection. @@ -1272,9 +1352,11 @@ class Collection(SynchronizedCollectionBase): if it conflicts with a collection with the same name and owner. If False, a name conflict will result in an error. + :num_retries: + Retry count on API calls (if None, use the collection default) + """ self._my_block_manager().commit_all() - self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries) text = self.manifest_text(strip=False) if create_collection_record: @@ -1291,8 +1373,10 @@ class Collection(SynchronizedCollectionBase): self._manifest_locator = self._api_response["uuid"] - self._manifest_text = text - self.set_unmodified() + self._manifest_text = text + self.set_unmodified() + + return text @synchronized def subscribe(self, callback): @@ -1342,7 +1426,7 @@ class Collection(SynchronizedCollectionBase): block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok) if block_locator: blocksize = long(block_locator.group(1)) - blocks.append(Range(tok, streamoffset, blocksize)) + blocks.append(Range(tok, streamoffset, blocksize, 0)) streamoffset += blocksize else: state = SEGMENTS @@ -1370,7 +1454,7 @@ class Collection(SynchronizedCollectionBase): self.set_unmodified() -class Subcollection(SynchronizedCollectionBase): +class Subcollection(RichCollectionBase): """This is a subdirectory within a collection that doesn't have its own API server record.