-def normalize_stream(s, stream):
- stream_tokens = [s]
- sortedfiles = list(stream.keys())
- sortedfiles.sort()
-
- blocks = {}
- streamoffset = 0L
- for f in sortedfiles:
- for b in stream[f]:
- if b[arvados.LOCATOR] not in blocks:
- stream_tokens.append(b[arvados.LOCATOR])
- blocks[b[arvados.LOCATOR]] = streamoffset
- streamoffset += b[arvados.BLOCKSIZE]
-
- if len(stream_tokens) == 1:
- stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
-
- for f in sortedfiles:
- current_span = None
- fout = f.replace(' ', '\\040')
- for segment in stream[f]:
- segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
- if current_span == None:
- current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
- else:
- if segmentoffset == current_span[1]:
- current_span[1] += segment[arvados.SEGMENTSIZE]
- else:
- stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
- current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
-
- if current_span != None:
- stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
-
- if len(stream[f]) == 0:
- stream_tokens.append("0:0:{0}".format(fout))
-
- return stream_tokens
-
-def normalize(collection):
- streams = {}
- for s in collection.all_streams():
- for f in s.all_files():
- filestream = s.name() + "/" + f.name()
- r = filestream.rindex("/")
- streamname = filestream[:r]
- filename = filestream[r+1:]
- if streamname not in streams:
- streams[streamname] = {}
- if filename not in streams[streamname]:
- streams[streamname][filename] = []
- for r in f.segments:
- streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
-
- normalized_streams = []
- sortedstreams = list(streams.keys())
- sortedstreams.sort()
- for s in sortedstreams:
- normalized_streams.append(normalize_stream(s, streams[s]))
- return normalized_streams
-
-
-class CollectionReader(object):
- def __init__(self, manifest_locator_or_text, api_client=None):
+class CollectionBase(object):
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ pass
+
+ def _my_keep(self):
+ if self._keep_client is None:
+ self._keep_client = KeepClient(api_client=self._api_client,
+ num_retries=self.num_retries)
+ return self._keep_client
+
+ def stripped_manifest(self):
+ """
+ Return the manifest for the current collection with all
+ non-portable hints (i.e., permission signatures and other
+ hints other than size hints) removed from the locators.
+ """
+ raw = self.manifest_text()
+ clean = []
+ for line in raw.split("\n"):
+ fields = line.split()
+ if fields:
+ clean_fields = fields[:1] + [
+ (re.sub(r'\+[^\d][^\+]*', '', x)
+ if re.match(util.keep_locator_pattern, x)
+ else x)
+ for x in fields[1:]]
+ clean += [' '.join(clean_fields), "\n"]
+ return ''.join(clean)
+
+
+class CollectionReader(CollectionBase):
+ def __init__(self, manifest_locator_or_text, api_client=None,
+ keep_client=None, num_retries=0):
+ """Instantiate a CollectionReader.
+
+ This class parses Collection manifests to provide a simple interface
+ to read its underlying files.
+
+ Arguments:
+ * manifest_locator_or_text: One of a Collection UUID, portable data
+ hash, or full manifest text.
+ * api_client: The API client to use to look up Collections. If not
+ provided, CollectionReader will build one from available Arvados
+ configuration.
+ * keep_client: The KeepClient to use to download Collection data.
+ If not provided, CollectionReader will build one from available
+ Arvados configuration.
+ * num_retries: The default number of times to retry failed
+ service requests. Default 0. You may change this value
+ after instantiation, but note those changes may not
+ propagate to related objects like the Keep client.
+ """