-class CollectionReader(CollectionBase):
- def __init__(self, manifest_locator_or_text, api_client=None,
- keep_client=None, num_retries=0):
- """Instantiate a CollectionReader.
-
- This class parses Collection manifests to provide a simple interface
- to read its underlying files.
-
- Arguments:
- * manifest_locator_or_text: One of a Collection UUID, portable data
- hash, or full manifest text.
- * api_client: The API client to use to look up Collections. If not
- provided, CollectionReader will build one from available Arvados
- configuration.
- * keep_client: The KeepClient to use to download Collection data.
- If not provided, CollectionReader will build one from available
- Arvados configuration.
- * num_retries: The default number of times to retry failed
- service requests. Default 0. You may change this value
- after instantiation, but note those changes may not
- propagate to related objects like the Keep client.
- """
- self._api_client = api_client
- self._keep_client = keep_client
- self.num_retries = num_retries
- if re.match(util.keep_locator_pattern, manifest_locator_or_text):
- self._manifest_locator = manifest_locator_or_text
- self._manifest_text = None
- elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
- self._manifest_locator = manifest_locator_or_text
- self._manifest_text = None
- elif re.match(util.manifest_pattern, manifest_locator_or_text):
- self._manifest_text = manifest_locator_or_text
- self._manifest_locator = None
- else:
- raise errors.ArgumentError(
- "Argument to CollectionReader must be a manifest or a collection UUID")
- self._api_response = None
- self._streams = None
-
- def _populate_from_api_server(self):
- # As in KeepClient itself, we must wait until the last
- # possible moment to instantiate an API client, in order to
- # avoid tripping up clients that don't have access to an API
- # server. If we do build one, make sure our Keep client uses
- # it. If instantiation fails, we'll fall back to the except
- # clause, just like any other Collection lookup
- # failure. Return an exception, or None if successful.
- try:
- if self._api_client is None:
- self._api_client = arvados.api('v1')
- self._keep_client = None # Make a new one with the new api.
- self._api_response = self._api_client.collections().get(
- uuid=self._manifest_locator).execute(
- num_retries=self.num_retries)
- self._manifest_text = self._api_response['manifest_text']
- return None
- except Exception as e:
- return e
-
- def _populate_from_keep(self):
- # Retrieve a manifest directly from Keep. This has a chance of
- # working if [a] the locator includes a permission signature
- # or [b] the Keep services are operating in world-readable
- # mode. Return an exception, or None if successful.
- try:
- self._manifest_text = self._my_keep().get(
- self._manifest_locator, num_retries=self.num_retries)
- except Exception as e:
- return e
-
- def _populate(self):
- error_via_api = None
- error_via_keep = None
- should_try_keep = ((self._manifest_text is None) and
- util.keep_locator_pattern.match(
- self._manifest_locator))
- if ((self._manifest_text is None) and
- util.signed_locator_pattern.match(self._manifest_locator)):
- error_via_keep = self._populate_from_keep()
- if self._manifest_text is None:
- error_via_api = self._populate_from_api_server()
- if error_via_api is not None and not should_try_keep:
- raise error_via_api
- if ((self._manifest_text is None) and
- not error_via_keep and
- should_try_keep):
- # Looks like a keep locator, and we didn't already try keep above
- error_via_keep = self._populate_from_keep()
- if self._manifest_text is None:
- # Nothing worked!
- raise arvados.errors.NotFoundError(
- ("Failed to retrieve collection '{}' " +
- "from either API server ({}) or Keep ({})."
- ).format(
- self._manifest_locator,
- error_via_api,
- error_via_keep))
- self._streams = [sline.split()
- for sline in self._manifest_text.split("\n")
- if sline]
-
- @staticmethod
- def _populate_first(orig_func):
- # Decorator for methods that read actual Collection data.
- @functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
- if self._streams is None:
- self._populate()
- return orig_func(self, *args, **kwargs)
- return wrapper
-
- @_populate_first
- def api_response(self):
- """api_response() -> dict or None
-
- Returns information about this Collection fetched from the API server.
- If the Collection exists in Keep but not the API server, currently
- returns None. Future versions may provide a synthetic response.
- """
- return self._api_response
-
- @_populate_first
- def normalize(self):
- # Rearrange streams
- streams = {}
- for s in self.all_streams():
- for f in s.all_files():
- streamname, filename = split(s.name() + "/" + f.name())
- if streamname not in streams:
- streams[streamname] = {}
- if filename not in streams[streamname]:
- streams[streamname][filename] = []
- for r in f.segments:
- streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
-
- self._streams = [normalize_stream(s, streams[s])
- for s in sorted(streams)]
-
- # Regenerate the manifest text based on the normalized streams
- self._manifest_text = ''.join(
- [StreamReader(stream, keep=self._my_keep()).manifest_text()
- for stream in self._streams])
-
- @_populate_first
- def open(self, streampath, filename=None):
- """open(streampath[, filename]) -> file-like object
-
- Pass in the path of a file to read from the Collection, either as a
- single string or as two separate stream name and file name arguments.
- This method returns a file-like object to read that file.
- """
- if filename is None:
- streampath, filename = split(streampath)
- keep_client = self._my_keep()
- for stream_s in self._streams:
- stream = StreamReader(stream_s, keep_client,
- num_retries=self.num_retries)
- if stream.name() == streampath:
- break
- else:
- raise ValueError("stream '{}' not found in Collection".
- format(streampath))
- try:
- return stream.files()[filename]
- except KeyError:
- raise ValueError("file '{}' not found in Collection stream '{}'".
- format(filename, streampath))
-
- @_populate_first
- def all_streams(self):
- return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
- for s in self._streams]
-
- def all_files(self):
- for s in self.all_streams():
- for f in s.all_files():
- yield f
-
- @_populate_first
- def manifest_text(self, strip=False, normalize=False):
- if normalize:
- cr = CollectionReader(self.manifest_text())
- cr.normalize()
- return cr.manifest_text(strip=strip, normalize=False)
- elif strip:
- return self.stripped_manifest()
- else:
- return self._manifest_text
-
-
-class _WriterFile(ArvadosFileBase):