caching wip
[arvados.git] / sdk / python / arvados / collection.py
index 3d0005ade7f013c6ec8c0e21acca44bb65c23b27..d7905803e00cec26daf708b8eb551a18848c07d6 100644 (file)
@@ -23,6 +23,43 @@ from stream import *
 import config
 import errors
 
+def normalize_stream(s, stream):
+    stream_tokens = [s]
+    sortedfiles = list(stream.keys())
+    sortedfiles.sort()
+
+    blocks = {}
+    streamoffset = 0L
+    for f in sortedfiles:
+        for b in stream[f]:
+            if b[arvados.LOCATOR] not in blocks:
+                stream_tokens.append(b[arvados.LOCATOR])
+                blocks[b[arvados.LOCATOR]] = streamoffset
+                streamoffset += b[arvados.BLOCKSIZE]
+
+    for f in sortedfiles:
+        current_span = None
+        fout = f.replace(' ', '\\040')
+        for segment in stream[f]:
+            segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
+            if current_span == None:
+                current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
+            else:
+                if segmentoffset == current_span[1]:
+                    current_span[1] += segment[arvados.SEGMENTSIZE]
+                else:
+                    stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
+                    current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
+
+        if current_span != None:
+            stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
+
+        if len(stream[f]) == 0:
+            stream_tokens.append("0:0:{0}".format(fout))            
+
+    return stream_tokens
+    
+
 def normalize(collection):
     streams = {}
     for s in collection.all_streams():
@@ -35,56 +72,25 @@ def normalize(collection):
                 streams[streamname] = {}
             if filename not in streams[streamname]:
                 streams[streamname][filename] = []
-            print streamname, filename
-            streams[streamname][filename].extend(s.locators_and_ranges(f.stream_offset(), f.size()))
-            
-    manifest = ""
+            for r in f.segments:
+                streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
+
+    normalized_streams = []
     sortedstreams = list(streams.keys())
     sortedstreams.sort()
-    import pprint
-    pprint.pprint(streams)
     for s in sortedstreams:
-        stream = streams[s]
-        manifest += s
-        sortedfiles = list(stream.keys())
-        sortedfiles.sort()
-        for f in sortedfiles:
-            fn = stream[f]
-            for chunk in fn:
-                manifest += " " + chunk[StreamReader.LOCATOR]
-        for f in sortedfiles:
-            fn = stream[f]
-            streamoffset = 0L
-            fileoffset = 0L
-            easy = True
-            for chunk in fn:
-                if chunk[StreamReader.CHUNKOFFSET] != 0 or streamoffset != fileoffset:
-                    easy = False
-                streamoffset += chunk[StreamReader.BLOCKSIZE]
-                fileoffset += chunk[StreamReader.CHUNKSIZE]
-
-            if easy:
-                manifest += " " + "{0}:{1}:{2}".format(0, fileoffset, f)
-            else:
-                streamoffset = 0
-                fileoffset = 0
-                # not easy
-                for chunk in fn:
-                    manifest += " " + "{0}:{1}:{2}".format(streamoffset + chunk[StreamReader.CHUNKOFFSET], chunk[StreamReader.CHUNKSIZE], f)
-                    streamoffset += chunk[StreamReader.BLOCKSIZE]
-                    fileoffset += chunk[StreamReader.CHUNKSIZE]
+        normalized_streams.append(normalize_stream(s, streams[s]))
+    return normalized_streams
 
-        manifest += "\n"
-    return manifest
 
 class CollectionReader(object):
     def __init__(self, manifest_locator_or_text):
-        if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
-            self._manifest_text = manifest_locator_or_text
-            self._manifest_locator = None
-        else:
+        if re.search(r'^[a-f0-9]{32}\+\d+(\+\S)*$', manifest_locator_or_text):
             self._manifest_locator = manifest_locator_or_text
             self._manifest_text = None
+        else:
+            self._manifest_text = manifest_locator_or_text
+            self._manifest_locator = None
         self._streams = None
 
     def __enter__(self):
@@ -110,12 +116,24 @@ class CollectionReader(object):
             if stream_line != '':
                 stream_tokens = stream_line.split()
                 self._streams += [stream_tokens]
+        self._streams = normalize(self)
+
+        # now regenerate the manifest text based on the normalized stream
+
+        #print "normalizing", self._manifest_text
+        self._manifest_text = ''
+        for stream in self._streams:
+            self._manifest_text += stream[0].replace(' ', '\\040')
+            for t in stream[1:]:
+                self._manifest_text += (" " + t.replace(' ', '\\040'))
+            self._manifest_text += "\n"
+        #print "result     ", self._manifest_text
 
     def all_streams(self):
         self._populate()
         resp = []
         for s in self._streams:
-            resp += [StreamReader(s)]
+            resp.append(StreamReader(s))
         return resp
 
     def all_files(self):
@@ -257,6 +275,7 @@ class CollectionWriter(object):
     def finish(self):
         return Keep.put(self.manifest_text())
 
+
     def manifest_text(self):
         self.finish_current_stream()
         manifest = ''
@@ -269,7 +288,7 @@ class CollectionWriter(object):
             for sfile in stream[2]:
                 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040'))
             manifest += "\n"
-        return manifest
+        return CollectionReader(manifest).manifest_text()
 
     def data_locators(self):
         ret = []