From a9c427bdcdb215119bb28d2b53b0f8b462048e72 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 14 Feb 2014 19:56:36 -0500 Subject: [PATCH] Fixing things up --- sdk/python/arvados/collection.py | 23 ++++++++++++----------- sdk/python/arvados/stream.py | 15 ++++++++------- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 96ae100014..6727dce52f 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -35,7 +35,8 @@ def normalize(collection): streams[streamname] = {} if filename not in streams[streamname]: streams[streamname][filename] = [] - streams[streamname][filename].extend(s.locators_and_ranges(f.stream_offset(), f.size())) + for r in f.segments: + streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1])) normalized_streams = [] sortedstreams = list(streams.keys()) @@ -51,24 +52,24 @@ def normalize(collection): streamoffset = 0L for f in sortedfiles: for b in stream[f]: - if b[StreamReader.LOCATOR] not in blocks: - stream_tokens.append(b[StreamReader.LOCATOR]) - blocks[b[StreamReader.LOCATOR]] = streamoffset - streamoffset += b[StreamReader.BLOCKSIZE] + if b[arvados.LOCATOR] not in blocks: + stream_tokens.append(b[arvados.LOCATOR]) + blocks[b[arvados.LOCATOR]] = streamoffset + streamoffset += b[arvados.BLOCKSIZE] for f in sortedfiles: current_span = None fout = f.replace(' ', '\\040') - for chunk in stream[f]: - chunkoffset = blocks[chunk[StreamReader.LOCATOR]] + chunk[StreamReader.OFFSET] + for segment in stream[f]: + segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET] if current_span == None: - current_span = [chunkoffset, chunkoffset + chunk[StreamReader.CHUNKSIZE]] + current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]] else: - if chunkoffset == current_span[1]: - current_span[1] += chunk[StreamReader.CHUNKSIZE] + if segmentoffset == current_span[1]: + current_span[1] += segment[arvados.SEGMENTSIZE] else: stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout)) - current_span = [chunkoffset, chunkoffset + chunk[StreamReader.CHUNKSIZE]] + current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]] if current_span != None: stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout)) diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py index 4a53324c8a..3843411771 100644 --- a/sdk/python/arvados/stream.py +++ b/sdk/python/arvados/stream.py @@ -27,7 +27,7 @@ BLOCKSIZE = 1 OFFSET = 2 SEGMENTSIZE = 3 -def locators_and_ranges(self, data_locators, range_start, range_size): +def locators_and_ranges(data_locators, range_start, range_size): '''returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range''' resp = [] range_start = long(range_start) @@ -81,10 +81,10 @@ class StreamFileReader(object): return self._filepos def size(self): - n = self.data_locators[-1] - return n[self.OFFSET] + n[self.BLOCKSIZE] + n = self.segments[-1] + return n[OFFSET] + n[BLOCKSIZE] - def read(self, size, **kwargs): + def read(self, size): """Read up to 'size' bytes from the stream, starting at the current file position""" if size == 0: return '' @@ -96,9 +96,9 @@ class StreamFileReader(object): self._filepos += len(data) return data - def readall(self, size=2**20, **kwargs): + def readall(self, size=2**20): while True: - data = self.read(size, **kwargs) + data = self.read(size) if data == '': break yield data @@ -204,7 +204,8 @@ class StreamReader(object): n = self.data_locators[-1] return n[self.OFFSET] + n[self.BLOCKSIZE] - def locators_and_ranges(self.data_locators, self._pos, size) + def locators_and_ranges(self, range_start, range_size): + return locators_and_ranges(self.data_locators, range_start, range_size) def read(self, size): """Read up to 'size' bytes from the stream, starting at the current file position""" -- 2.39.5