streams[streamname] = {}
if filename not in streams[streamname]:
streams[streamname][filename] = []
- streams[streamname][filename].extend(s.locators_and_ranges(f.stream_offset(), f.size()))
+ for r in f.segments:
+ streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
normalized_streams = []
sortedstreams = list(streams.keys())
streamoffset = 0L
for f in sortedfiles:
for b in stream[f]:
- if b[StreamReader.LOCATOR] not in blocks:
- stream_tokens.append(b[StreamReader.LOCATOR])
- blocks[b[StreamReader.LOCATOR]] = streamoffset
- streamoffset += b[StreamReader.BLOCKSIZE]
+ if b[arvados.LOCATOR] not in blocks:
+ stream_tokens.append(b[arvados.LOCATOR])
+ blocks[b[arvados.LOCATOR]] = streamoffset
+ streamoffset += b[arvados.BLOCKSIZE]
for f in sortedfiles:
current_span = None
fout = f.replace(' ', '\\040')
- for chunk in stream[f]:
- chunkoffset = blocks[chunk[StreamReader.LOCATOR]] + chunk[StreamReader.OFFSET]
+ for segment in stream[f]:
+ segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
if current_span == None:
- current_span = [chunkoffset, chunkoffset + chunk[StreamReader.CHUNKSIZE]]
+ current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
else:
- if chunkoffset == current_span[1]:
- current_span[1] += chunk[StreamReader.CHUNKSIZE]
+ if segmentoffset == current_span[1]:
+ current_span[1] += segment[arvados.SEGMENTSIZE]
else:
stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
- current_span = [chunkoffset, chunkoffset + chunk[StreamReader.CHUNKSIZE]]
+ current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
if current_span != None:
stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
OFFSET = 2
SEGMENTSIZE = 3
-def locators_and_ranges(self, data_locators, range_start, range_size):
+def locators_and_ranges(data_locators, range_start, range_size):
'''returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range'''
resp = []
range_start = long(range_start)
return self._filepos
def size(self):
- n = self.data_locators[-1]
- return n[self.OFFSET] + n[self.BLOCKSIZE]
+ n = self.segments[-1]
+ return n[OFFSET] + n[BLOCKSIZE]
- def read(self, size, **kwargs):
+ def read(self, size):
"""Read up to 'size' bytes from the stream, starting at the current file position"""
if size == 0:
return ''
self._filepos += len(data)
return data
- def readall(self, size=2**20, **kwargs):
+ def readall(self, size=2**20):
while True:
- data = self.read(size, **kwargs)
+ data = self.read(size)
if data == '':
break
yield data
n = self.data_locators[-1]
return n[self.OFFSET] + n[self.BLOCKSIZE]
- def locators_and_ranges(self.data_locators, self._pos, size)
+ def locators_and_ranges(self, range_start, range_size):
+ return locators_and_ranges(self.data_locators, range_start, range_size)
def read(self, size):
"""Read up to 'size' bytes from the stream, starting at the current file position"""