block_size = data_locators[i][BLOCKSIZE]
block_start = data_locators[i][OFFSET]
block_end = block_start + block_size
-
+
while i < len(data_locators):
locator, block_size, block_start = data_locators[i]
block_end = block_start + block_size
data = ''
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, self._filepos, size):
data += self._stream.readfrom(locator+segmentoffset, segmentsize)
- self._filepos += len(data)
+ self._filepos += len(data)
return data
def readfrom(self, start, size):
dc = bz2.BZ2Decompressor()
return self.decompress(lambda segment: dc.decompress(segment), size)
elif re.search('\.gz$', self._name):
- dc = zlib.decompressobj(16+zlib.MAX_WBITS)
+ dc = zlib.decompressobj(16+zlib.MAX_WBITS)
return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
else:
return self.readall(size)
if data != '':
yield data
+ def as_manifest(self):
+ manifest_text = ['.']
+ manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
+ manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
+ return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
+
class StreamReader(object):
- def __init__(self, tokens, keep=None, debug=False):
+ def __init__(self, tokens, keep=None, debug=False, _empty=False):
self._stream_name = None
self._data_locators = []
self._files = collections.OrderedDict()
self._keep = keep
else:
self._keep = Keep.global_client_object()
-
+
streamoffset = 0L
# parse stream
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
return data
+
+ def manifest_text(self, strip=False):
+ manifest_text = [self.name().replace(' ', '\\040')]
+ if strip:
+ for d in self._data_locators:
+ m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR])
+ manifest_text.append(m.group(0))
+ else:
+ manifest_text.extend([d[LOCATOR] for d in self._data_locators])
+ manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
+ for seg in f.segments])
+ for f in self._files.values()])
+ return ' '.join(manifest_text) + '\n'