4823: Refactoring. ReadOnly Collection is now CollectionReader, replacing old
[arvados.git] / sdk / python / arvados / _normalize_stream.py
diff --git a/sdk/python/arvados/_normalize_stream.py b/sdk/python/arvados/_normalize_stream.py
new file mode 100644 (file)
index 0000000..4423639
--- /dev/null
@@ -0,0 +1,52 @@
+def normalize_stream(stream_name, stream):
+    """Take manifest stream and return a list of tokens in normalized format.
+
+    :stream_name:
+      The name of the stream.
+
+    :stream:
+      A dict mapping each filename to a list of `_range.LocatorAndRange` objects.
+
+    """
+
+    stream_tokens = [stream_name]
+    sortedfiles = list(stream.keys())
+    sortedfiles.sort()
+
+    blocks = {}
+    streamoffset = 0L
+    # Go through each file and add each referenced block exactly once.
+    for streamfile in sortedfiles:
+        for segment in stream[streamfile]:
+            if segment.locator not in blocks:
+                stream_tokens.append(segment.locator)
+                blocks[segment.locator] = streamoffset
+                streamoffset += segment.block_size
+
+    # Add the empty block if the stream is otherwise empty.
+    if len(stream_tokens) == 1:
+        stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
+
+    for streamfile in sortedfiles:
+        # Add in file segments
+        current_span = None
+        fout = streamfile.replace(' ', '\\040')
+        for segment in stream[streamfile]:
+            # Collapse adjacent segments
+            streamoffset = blocks[segment.locator] + segment.segment_offset
+            if current_span is None:
+                current_span = [streamoffset, streamoffset + segment.segment_size]
+            else:
+                if streamoffset == current_span[1]:
+                    current_span[1] += segment.segment_size
+                else:
+                    stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
+                    current_span = [streamoffset, streamoffset + segment.segment_size]
+
+        if current_span is not None:
+            stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
+
+        if not stream[streamfile]:
+            stream_tokens.append("0:0:{0}".format(fout))
+
+    return stream_tokens