X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/519c5922c34da64a8cce9e4d0030892e9b4bdd83..878620a8b1827ed3f58e267a89f76c2dbeaa4b65:/sdk/python/arvados/collection.py diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 25356695d0..f7364758a5 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -22,6 +22,43 @@ from keep import * from stream import * import config import errors +import util + +def normalize_stream(s, stream): + stream_tokens = [s] + sortedfiles = list(stream.keys()) + sortedfiles.sort() + + blocks = {} + streamoffset = 0L + for f in sortedfiles: + for b in stream[f]: + if b[arvados.LOCATOR] not in blocks: + stream_tokens.append(b[arvados.LOCATOR]) + blocks[b[arvados.LOCATOR]] = streamoffset + streamoffset += b[arvados.BLOCKSIZE] + + for f in sortedfiles: + current_span = None + fout = f.replace(' ', '\\040') + for segment in stream[f]: + segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET] + if current_span == None: + current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]] + else: + if segmentoffset == current_span[1]: + current_span[1] += segment[arvados.SEGMENTSIZE] + else: + stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout)) + current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]] + + if current_span != None: + stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout)) + + if len(stream[f]) == 0: + stream_tokens.append("0:0:{0}".format(fout)) + + return stream_tokens def normalize(collection): streams = {} @@ -35,56 +72,28 @@ def normalize(collection): streams[streamname] = {} if filename not in streams[streamname]: streams[streamname][filename] = [] - streams[streamname][filename].extend(s.locators_and_ranges(f.stream_offset(), f.size())) + for r in f.segments: + streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1])) normalized_streams = [] sortedstreams = list(streams.keys()) sortedstreams.sort() for s in sortedstreams: - stream = streams[s] - stream_tokens = [s] - - sortedfiles = list(stream.keys()) - sortedfiles.sort() - - blocks = {} - streamoffset = 0L - for f in sortedfiles: - for b in stream[f]: - if b[StreamReader.LOCATOR] not in blocks: - stream_tokens.append(b[StreamReader.LOCATOR]) - blocks[b[StreamReader.LOCATOR]] = streamoffset - streamoffset += b[StreamReader.BLOCKSIZE] - - for f in sortedfiles: - current_span = None - fout = f.replace(' ', '\\040') - for chunk in stream[f]: - chunkoffset = blocks[chunk[StreamReader.LOCATOR]] + chunk[StreamReader.CHUNKOFFSET] - if current_span == None: - current_span = [chunkoffset, chunkoffset + chunk[StreamReader.CHUNKSIZE]] - else: - if chunkoffset == current_span[1]: - current_span[1] += chunk[StreamReader.CHUNKSIZE] - else: - stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout)) - current_span = [chunkoffset, chunkoffset + chunk[StreamReader.CHUNKSIZE]] - - if current_span != None: - stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout)) - - normalized_streams.append(stream_tokens) + normalized_streams.append(normalize_stream(s, streams[s])) return normalized_streams class CollectionReader(object): def __init__(self, manifest_locator_or_text): - if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text): + if re.search(r'^[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text): + self._manifest_locator = manifest_locator_or_text + self._manifest_text = None + elif re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)*( \d+:\d+:\S+)+\n', manifest_locator_or_text): self._manifest_text = manifest_locator_or_text self._manifest_locator = None else: - self._manifest_locator = manifest_locator_or_text - self._manifest_text = None + raise errors.ArgumentError( + "Argument to CollectionReader must be a manifest or a collection UUID") self._streams = None def __enter__(self): @@ -114,20 +123,16 @@ class CollectionReader(object): # now regenerate the manifest text based on the normalized stream - #print "normalizing", self._manifest_text - self._manifest_text = '' - for stream in self._streams: - self._manifest_text += stream[0].replace(' ', '\\040') - for t in stream[1:]: - self._manifest_text += (" " + t.replace(' ', '\\040')) - self._manifest_text += "\n" - #print "result ", self._manifest_text + #print "normalizing", self._manifest_text + self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams]) + #print "result", self._manifest_text + def all_streams(self): self._populate() resp = [] for s in self._streams: - resp += [StreamReader(s)] + resp.append(StreamReader(s)) return resp def all_files(self): @@ -269,19 +274,21 @@ class CollectionWriter(object): def finish(self): return Keep.put(self.manifest_text()) - def manifest_text(self): self.finish_current_stream() manifest = '' + for stream in self._finished_streams: if not re.search(r'^\.(/.*)?$', stream[0]): manifest += './' manifest += stream[0].replace(' ', '\\040') - for locator in stream[1]: - manifest += " %s" % locator - for sfile in stream[2]: - manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) + manifest += ' ' + ' '.join(stream[1]) + manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2]) manifest += "\n" + + #print 'writer',manifest + #print 'after reader',CollectionReader(manifest).manifest_text() + return CollectionReader(manifest).manifest_text() def data_locators(self):