+class Collection(SynchronizedCollectionBase):
+ """Store an Arvados collection, consisting of a set of files and
+ sub-collections.
+ """
+
+ def __init__(self, manifest_locator_or_text=None,
+ parent=None,
+ config=None,
+ api_client=None,
+ keep_client=None,
+ num_retries=0,
+ block_manager=None,
+ sync=Collection.SYNC_READONLY):
+ """:manifest_locator_or_text:
+ One of Arvados collection UUID, block locator of
+ a manifest, raw manifest text, or None (to create an empty collection).
+ :parent:
+ the parent Collection, may be None.
+ :config:
+ the arvados configuration to get the hostname and api token.
+ Prefer this over supplying your own api_client and keep_client (except in testing).
+ Will use default config settings if not specified.
+ :api_client:
+ The API client object to use for requests. If not specified, create one using `config`.
+ :keep_client:
+ the Keep client to use for requests. If not specified, create one using `config`.
+ :num_retries:
+ the number of retries for API and Keep requests.
+ :block_manager:
+ the block manager to use. If not specified, create one.
+ :sync:
+ Set synchronization policy with API server collection record.
+ :SYNC_READONLY:
+ Collection is read only. No synchronization. This mode will
+ also forego locking, which gives better performance.
+ :SYNC_EXPLICIT:
+ Synchronize on explicit request via `merge()` or `save()`
+ :SYNC_LIVE:
+ Synchronize with server in response to background websocket events,
+ on block write, or on file close.
+
+ """
+
+ self.parent = parent
+ self._items = None
+ self._api_client = api_client
+ self._keep_client = keep_client
+ self._block_manager = block_manager
+ self._config = config
+ self.num_retries = num_retries
+ self._manifest_locator = None
+ self._manifest_text = None
+ self._api_response = None
+ self._sync = sync
+ self.lock = threading.RLock()
+ self.callbacks = []
+
+ if manifest_locator_or_text:
+ if re.match(util.keep_locator_pattern, manifest_locator_or_text):
+ self._manifest_locator = manifest_locator_or_text
+ elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
+ self._manifest_locator = manifest_locator_or_text
+ elif re.match(util.manifest_pattern, manifest_locator_or_text):
+ self._manifest_text = manifest_locator_or_text
+ else:
+ raise errors.ArgumentError(
+ "Argument to CollectionReader must be a manifest or a collection UUID")
+
+ self._populate()
+
+ if self._sync == SYNC_LIVE:
+ if not self._manifest_locator or not re.match(util.collection_uuid_pattern, self._manifest_locator):
+ raise errors.ArgumentError("Cannot SYNC_LIVE unless a collection uuid is specified")
+ self.events = events.subscribe(arvados.api(), filters=[["object_uuid", "=", self._manifest_locator]], self.on_message)
+
+ @staticmethod
+ def create(name, owner_uuid=None, sync=SYNC_EXPLICIT):
+ c = Collection(sync=SYNC_EXPLICIT)
+ c.save_as(name, owner_uuid=owner_uuid, ensure_unique_name=True)
+ return c
+
+ def _root_lock(self):
+ return self.lock
+
+ def sync_mode(self):
+ return self._sync
+
+ @_synchronized
+ def on_message():
+ n = self._my_api().collections().get(uuid=self._manifest_locator, select=[["manifest_text"])).execute()
+ other = import_collection(n["manifest_text"])
+ self.merge(other)
+
+ @_synchronized
+ def _my_api(self):
+ if self._api_client is None:
+ self._api_client = arvados.api.SafeApi(self._config)
+ self._keep_client = self._api_client.keep
+ return self._api_client
+
+ @_synchronized
+ def _my_keep(self):
+ if self._keep_client is None:
+ if self._api_client is None:
+ self._my_api()
+ else:
+ self._keep_client = KeepClient(api=self._api_client)
+ return self._keep_client
+
+ @_synchronized
+ def _my_block_manager(self):
+ if self._block_manager is None:
+ self._block_manager = BlockManager(self._my_keep())
+ return self._block_manager
+
+ def _populate_from_api_server(self):
+ # As in KeepClient itself, we must wait until the last
+ # possible moment to instantiate an API client, in order to
+ # avoid tripping up clients that don't have access to an API
+ # server. If we do build one, make sure our Keep client uses
+ # it. If instantiation fails, we'll fall back to the except
+ # clause, just like any other Collection lookup
+ # failure. Return an exception, or None if successful.
+ try:
+ self._api_response = self._my_api().collections().get(
+ uuid=self._manifest_locator).execute(
+ num_retries=self.num_retries)
+ self._manifest_text = self._api_response['manifest_text']
+ return None
+ except Exception as e:
+ return e
+
+ def _populate_from_keep(self):
+ # Retrieve a manifest directly from Keep. This has a chance of
+ # working if [a] the locator includes a permission signature
+ # or [b] the Keep services are operating in world-readable
+ # mode. Return an exception, or None if successful.
+ try:
+ self._manifest_text = self._my_keep().get(
+ self._manifest_locator, num_retries=self.num_retries)
+ except Exception as e:
+ return e
+
+ def _populate(self):
+ self._items = {}
+ if self._manifest_locator is None and self._manifest_text is None:
+ return
+ error_via_api = None
+ error_via_keep = None
+ should_try_keep = ((self._manifest_text is None) and
+ util.keep_locator_pattern.match(
+ self._manifest_locator))
+ if ((self._manifest_text is None) and
+ util.signed_locator_pattern.match(self._manifest_locator)):
+ error_via_keep = self._populate_from_keep()
+ if self._manifest_text is None:
+ error_via_api = self._populate_from_api_server()
+ if error_via_api is not None and not should_try_keep:
+ raise error_via_api
+ if ((self._manifest_text is None) and
+ not error_via_keep and
+ should_try_keep):
+ # Looks like a keep locator, and we didn't already try keep above
+ error_via_keep = self._populate_from_keep()
+ if self._manifest_text is None:
+ # Nothing worked!
+ raise arvados.errors.NotFoundError(
+ ("Failed to retrieve collection '{}' " +
+ "from either API server ({}) or Keep ({})."
+ ).format(
+ self._manifest_locator,
+ error_via_api,
+ error_via_keep))
+ # populate
+ import_manifest(self._manifest_text, self)
+
+ if self._sync == SYNC_READONLY:
+ # Now that we're populated, knowing that this will be readonly,
+ # forego any further locking.
+ self.lock = NoopLock()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ """Support scoped auto-commit in a with: block"""
+ self.save(allow_no_locator=True)
+ if self._block_manager is not None:
+ self._block_manager.stop_threads()
+
+ @_synchronized
+ def clone(self, new_parent=None, new_sync=Collection.SYNC_READONLY, new_config=self.config):
+ c = Collection(parent=new_parent, config=new_config, sync=new_sync)
+ if new_sync == Collection.SYNC_READONLY:
+ c.lock = NoopLock()
+ c._items = {}
+ self._cloneinto(c)
+ return c
+
+ @_synchronized
+ def api_response(self):
+ """
+ api_response() -> dict or None
+
+ Returns information about this Collection fetched from the API server.
+ If the Collection exists in Keep but not the API server, currently
+ returns None. Future versions may provide a synthetic response.
+ """
+ return self._api_response
+
+ @_must_be_writable
+ @_synchronized
+ def save(self, allow_no_locator=False):
+ """Commit pending buffer blocks to Keep, write the manifest to Keep, and
+ update the collection record to Keep.
+
+ :allow_no_locator:
+ If there is no collection uuid associated with this
+ Collection and `allow_no_locator` is False, raise an error. If True,
+ do not raise an error.
+ """