+ @synchronized
+ def __eq__(self, other):
+ if other is self:
+ return True
+ if not isinstance(other, RichCollectionBase):
+ return False
+ if len(self._items) != len(other):
+ return False
+ for k in self._items:
+ if k not in other:
+ return False
+ if self._items[k] != other[k]:
+ return False
+ return True
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+
+class Collection(RichCollectionBase):
+ """Represents the root of an Arvados Collection.
+
+ This class is threadsafe. The root collection object, all subcollections
+ and files are protected by a single lock (i.e. each access locks the entire
+ collection).
+
+ Brief summary of
+ useful methods:
+
+ :To read an existing file:
+ `c.open("myfile", "r")`
+
+ :To write a new file:
+ `c.open("myfile", "w")`
+
+ :To determine if a file exists:
+ `c.find("myfile") is not None`
+
+ :To copy a file:
+ `c.copy("source", "dest")`
+
+ :To delete a file:
+ `c.remove("myfile")`
+
+ :To save to an existing collection record:
+ `c.save()`
+
+ :To save a new collection record:
+ `c.save_new()`
+
+ :To merge remote changes into this object:
+ `c.update()`
+
+ Must be associated with an API server Collection record (during
+ initialization, or using `save_new`) to use `save` or `update`
+
+ """
+
+ def __init__(self, manifest_locator_or_text=None,
+ api_client=None,
+ keep_client=None,
+ num_retries=None,
+ parent=None,
+ apiconfig=None,
+ block_manager=None):
+ """Collection constructor.
+
+ :manifest_locator_or_text:
+ One of Arvados collection UUID, block locator of
+ a manifest, raw manifest text, or None (to create an empty collection).
+ :parent:
+ the parent Collection, may be None.
+ :apiconfig:
+ A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
+ Prefer this over supplying your own api_client and keep_client (except in testing).
+ Will use default config settings if not specified.
+ :api_client:
+ The API client object to use for requests. If not specified, create one using `apiconfig`.
+ :keep_client:
+ the Keep client to use for requests. If not specified, create one using `apiconfig`.
+ :num_retries:
+ the number of retries for API and Keep requests.
+ :block_manager:
+ the block manager to use. If not specified, create one.
+
+ """
+ super(Collection, self).__init__(parent)
+ self._api_client = api_client
+ self._keep_client = keep_client
+ self._block_manager = block_manager
+
+ if apiconfig:
+ self._config = apiconfig
+ else:
+ self._config = config.settings()
+
+ self.num_retries = num_retries if num_retries is not None else 0
+ self._manifest_locator = None
+ self._manifest_text = None
+ self._api_response = None
+
+ self.lock = threading.RLock()
+ self.callbacks = []
+ self.events = None
+
+ if manifest_locator_or_text:
+ if re.match(util.keep_locator_pattern, manifest_locator_or_text):
+ self._manifest_locator = manifest_locator_or_text
+ elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
+ self._manifest_locator = manifest_locator_or_text
+ elif re.match(util.manifest_pattern, manifest_locator_or_text):
+ self._manifest_text = manifest_locator_or_text
+ else:
+ raise errors.ArgumentError(
+ "Argument to CollectionReader must be a manifest or a collection UUID")
+
+ try:
+ self._populate()
+ except (IOError, errors.SyntaxError) as e:
+ raise errors.ArgumentError("Error processing manifest text: %s", e)
+
+ def root_collection(self):
+ return self
+
+ def stream_name(self):
+ return "."
+
+ def writable(self):
+ return True
+
+ @synchronized
+ @retry_method
+ def update(self, other=None, num_retries=None):
+ """Merge the latest collection on the API server with the current collection."""
+
+ if other is None:
+ if self._manifest_locator is None:
+ raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
+ response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
+ other = CollectionReader(response["manifest_text"])
+ baseline = CollectionReader(self._manifest_text)
+ self.apply(baseline.diff(other))
+
+ @synchronized
+ def _my_api(self):
+ if self._api_client is None:
+ self._api_client = ThreadSafeApiCache(self._config)
+ self._keep_client = self._api_client.keep
+ return self._api_client
+
+ @synchronized
+ def _my_keep(self):
+ if self._keep_client is None:
+ if self._api_client is None:
+ self._my_api()
+ else:
+ self._keep_client = KeepClient(api_client=self._api_client)
+ return self._keep_client
+
+ @synchronized
+ def _my_block_manager(self):
+ if self._block_manager is None:
+ self._block_manager = _BlockManager(self._my_keep())
+ return self._block_manager
+
+ def _populate_from_api_server(self):
+ # As in KeepClient itself, we must wait until the last
+ # possible moment to instantiate an API client, in order to
+ # avoid tripping up clients that don't have access to an API
+ # server. If we do build one, make sure our Keep client uses
+ # it. If instantiation fails, we'll fall back to the except
+ # clause, just like any other Collection lookup
+ # failure. Return an exception, or None if successful.
+ try:
+ self._api_response = self._my_api().collections().get(
+ uuid=self._manifest_locator).execute(
+ num_retries=self.num_retries)
+ self._manifest_text = self._api_response['manifest_text']
+ return None
+ except Exception as e:
+ return e
+
+ def _populate_from_keep(self):
+ # Retrieve a manifest directly from Keep. This has a chance of
+ # working if [a] the locator includes a permission signature
+ # or [b] the Keep services are operating in world-readable
+ # mode. Return an exception, or None if successful.
+ try:
+ self._manifest_text = self._my_keep().get(
+ self._manifest_locator, num_retries=self.num_retries)
+ except Exception as e:
+ return e
+
+ def _populate(self):
+ if self._manifest_locator is None and self._manifest_text is None:
+ return
+ error_via_api = None
+ error_via_keep = None
+ should_try_keep = ((self._manifest_text is None) and
+ util.keep_locator_pattern.match(
+ self._manifest_locator))
+ if ((self._manifest_text is None) and
+ util.signed_locator_pattern.match(self._manifest_locator)):
+ error_via_keep = self._populate_from_keep()
+ if self._manifest_text is None:
+ error_via_api = self._populate_from_api_server()
+ if error_via_api is not None and not should_try_keep:
+ raise error_via_api
+ if ((self._manifest_text is None) and
+ not error_via_keep and
+ should_try_keep):
+ # Looks like a keep locator, and we didn't already try keep above
+ error_via_keep = self._populate_from_keep()
+ if self._manifest_text is None:
+ # Nothing worked!
+ raise errors.NotFoundError(
+ ("Failed to retrieve collection '{}' " +
+ "from either API server ({}) or Keep ({})."
+ ).format(
+ self._manifest_locator,
+ error_via_api,
+ error_via_keep))
+ # populate
+ self._baseline_manifest = self._manifest_text
+ self._import_manifest(self._manifest_text)
+
+
+ def _has_collection_uuid(self):
+ return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ """Support scoped auto-commit in a with: block."""
+ if exc_type is not None:
+ if self.writable() and self._has_collection_uuid():
+ self.save()
+ if self._block_manager is not None:
+ self._block_manager.stop_threads()
+
+ @synchronized
+ def manifest_locator(self):
+ """Get the manifest locator, if any.
+
+ The manifest locator will be set when the collection is loaded from an
+ API server record or the portable data hash of a manifest.
+
+ The manifest locator will be None if the collection is newly created or
+ was created directly from manifest text. The method `save_new()` will
+ assign a manifest locator.
+
+ """
+ return self._manifest_locator
+
+ @synchronized
+ def clone(self, new_parent=None, readonly=False, new_config=None):
+ if new_config is None:
+ new_config = self._config
+ if readonly:
+ newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
+ else:
+ newcollection = Collection(parent=new_parent, apiconfig=new_config)
+
+ newcollection._clonefrom(self)
+ return newcollection
+
+ @synchronized
+ def api_response(self):
+ """Returns information about this Collection fetched from the API server.
+
+ If the Collection exists in Keep but not the API server, currently
+ returns None. Future versions may provide a synthetic response.
+
+ """
+ return self._api_response
+
+ def find_or_create(self, path, create_type):
+ """See `RichCollectionBase.find_or_create`"""
+ if path == ".":
+ return self
+ else:
+ return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
+
+ def find(self, path):
+ """See `RichCollectionBase.find`"""
+ if path == ".":
+ return self
+ else:
+ return super(Collection, self).find(path[2:] if path.startswith("./") else path)
+
+ def remove(self, path, recursive=False):
+ """See `RichCollectionBase.remove`"""
+ if path == ".":
+ raise errors.ArgumentError("Cannot remove '.'")
+ else:
+ return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
+
+ @must_be_writable
+ @synchronized
+ @retry_method
+ def save(self, merge=True, num_retries=None):
+ """Save collection to an existing collection record.
+
+ Commit pending buffer blocks to Keep, merge with remote record (if
+ merge=True, the default), and update the collection record. Returns
+ the current manifest text.
+
+ Will raise AssertionError if not associated with a collection record on
+ the API server. If you want to save a manifest to Keep only, see
+ `save_new()`.
+
+ :merge:
+ Update and merge remote changes before saving. Otherwise, any
+ remote changes will be ignored and overwritten.
+
+ :num_retries:
+ Retry count on API calls (if None, use the collection default)