X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/600378f933f028ba497cac86a978fa71401d209b..04a4fa5843c3511260a750065cf79203ae1663ee:/sdk/python/arvados/stream.py diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py index 6ee12e18aa..e13e1a6eff 100644 --- a/sdk/python/arvados/stream.py +++ b/sdk/python/arvados/stream.py @@ -1,7 +1,6 @@ import gflags import httplib import httplib2 -import logging import os import pprint import sys @@ -71,7 +70,7 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False): block_size = data_locators[i][BLOCKSIZE] block_start = data_locators[i][OFFSET] block_end = block_start + block_size - + while i < len(data_locators): locator, block_size, block_start = data_locators[i] block_end = block_start + block_size @@ -135,9 +134,12 @@ class StreamFileReader(object): return '' data = '' - for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, self._filepos, size): - data += self._stream.readfrom(locator+segmentoffset, segmentsize) - self._filepos += len(data) + available_chunks = locators_and_ranges(self.segments, self._filepos, size) + if available_chunks: + locator, blocksize, segmentoffset, segmentsize = available_chunks[0] + data = self._stream.readfrom(locator+segmentoffset, segmentsize) + + self._filepos += len(data) return data def readfrom(self, start, size): @@ -169,7 +171,7 @@ class StreamFileReader(object): dc = bz2.BZ2Decompressor() return self.decompress(lambda segment: dc.decompress(segment), size) elif re.search('\.gz$', self._name): - dc = zlib.decompressobj(16+zlib.MAX_WBITS) + dc = zlib.decompressobj(16+zlib.MAX_WBITS) return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size) else: return self.readall(size) @@ -193,17 +195,22 @@ class StreamFileReader(object): if data != '': yield data + def as_manifest(self): + manifest_text = ['.'] + manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators]) + manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments]) + return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text() + class StreamReader(object): - def __init__(self, tokens, keep=None, debug=False): + def __init__(self, tokens, keep=None, debug=False, _empty=False): self._stream_name = None self._data_locators = [] self._files = collections.OrderedDict() - if keep != None: - self._keep = keep - else: - self._keep = Keep.global_client_object() - + if keep is None: + keep = KeepClient() + self._keep = keep + streamoffset = 0L # parse stream @@ -258,3 +265,16 @@ class StreamReader(object): for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size): data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize] return data + + def manifest_text(self, strip=False): + manifest_text = [self.name().replace(' ', '\\040')] + if strip: + for d in self._data_locators: + m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR]) + manifest_text.append(m.group(0)) + else: + manifest_text.extend([d[LOCATOR] for d in self._data_locators]) + manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040')) + for seg in f.segments]) + for f in self._files.values()]) + return ' '.join(manifest_text) + '\n'