- if self._streams != None:
- return
- if not self._manifest_text:
- try:
- c = arvados.api('v1').collections().get(
- uuid=self._manifest_locator).execute()
- self._manifest_text = c['manifest_text']
- except Exception as e:
- logging.warning("API lookup failed for collection %s (%s: %s)" %
- (self._manifest_locator, type(e), str(e)))
- self._manifest_text = Keep.get(self._manifest_locator)
- self._streams = []
- for stream_line in self._manifest_text.split("\n"):
- if stream_line != '':
- stream_tokens = stream_line.split()
- self._streams += [stream_tokens]
- self._streams = normalize(self)
-
- # now regenerate the manifest text based on the normalized stream
-
- #print "normalizing", self._manifest_text
- self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
- #print "result", self._manifest_text
-
+ error_via_api = None
+ error_via_keep = None
+ should_try_keep = ((self._manifest_text is None) and
+ util.keep_locator_pattern.match(
+ self._manifest_locator))
+ if ((self._manifest_text is None) and
+ util.signed_locator_pattern.match(self._manifest_locator)):
+ error_via_keep = self._populate_from_keep()
+ if self._manifest_text is None:
+ error_via_api = self._populate_from_api_server()
+ if error_via_api is not None and not should_try_keep:
+ raise error_via_api
+ if ((self._manifest_text is None) and
+ not error_via_keep and
+ should_try_keep):
+ # Looks like a keep locator, and we didn't already try keep above
+ error_via_keep = self._populate_from_keep()
+ if self._manifest_text is None:
+ # Nothing worked!
+ raise arvados.errors.NotFoundError(
+ ("Failed to retrieve collection '{}' " +
+ "from either API server ({}) or Keep ({})."
+ ).format(
+ self._manifest_locator,
+ error_via_api,
+ error_via_keep))
+ self._streams = [sline.split()
+ for sline in self._manifest_text.split("\n")
+ if sline]
+
+ def _populate_first(orig_func):
+ # Decorator for methods that read actual Collection data.
+ @functools.wraps(orig_func)
+ def wrapper(self, *args, **kwargs):
+ if self._streams is None:
+ self._populate()
+ return orig_func(self, *args, **kwargs)
+ return wrapper
+
+ @_populate_first
+ def api_response(self):
+ """api_response() -> dict or None
+
+ Returns information about this Collection fetched from the API server.
+ If the Collection exists in Keep but not the API server, currently
+ returns None. Future versions may provide a synthetic response.
+ """
+ return self._api_response
+
+ @_populate_first
+ def normalize(self):
+ # Rearrange streams
+ streams = {}
+ for s in self.all_streams():
+ for f in s.all_files():
+ streamname, filename = split(s.name() + "/" + f.name())
+ if streamname not in streams:
+ streams[streamname] = {}
+ if filename not in streams[streamname]:
+ streams[streamname][filename] = []
+ for r in f.segments:
+ streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
+
+ self._streams = [normalize_stream(s, streams[s])
+ for s in sorted(streams)]
+
+ # Regenerate the manifest text based on the normalized streams
+ self._manifest_text = ''.join(
+ [StreamReader(stream, keep=self._my_keep()).manifest_text()
+ for stream in self._streams])
+
+ @_populate_first
+ def open(self, streampath, filename=None):
+ """open(streampath[, filename]) -> file-like object
+
+ Pass in the path of a file to read from the Collection, either as a
+ single string or as two separate stream name and file name arguments.
+ This method returns a file-like object to read that file.
+ """
+ if filename is None:
+ streampath, filename = split(streampath)
+ keep_client = self._my_keep()
+ for stream_s in self._streams:
+ stream = StreamReader(stream_s, keep_client,
+ num_retries=self.num_retries)
+ if stream.name() == streampath:
+ break
+ else:
+ raise ValueError("stream '{}' not found in Collection".
+ format(streampath))
+ try:
+ return stream.files()[filename]
+ except KeyError:
+ raise ValueError("file '{}' not found in Collection stream '{}'".
+ format(filename, streampath))