4823: Refactoring. ReadOnly Collection is now CollectionReader, replacing old
[arvados.git] / sdk / python / arvados / _normalize_stream.py
1 def normalize_stream(stream_name, stream):
2     """Take manifest stream and return a list of tokens in normalized format.
3
4     :stream_name:
5       The name of the stream.
6
7     :stream:
8       A dict mapping each filename to a list of `_range.LocatorAndRange` objects.
9
10     """
11
12     stream_tokens = [stream_name]
13     sortedfiles = list(stream.keys())
14     sortedfiles.sort()
15
16     blocks = {}
17     streamoffset = 0L
18     # Go through each file and add each referenced block exactly once.
19     for streamfile in sortedfiles:
20         for segment in stream[streamfile]:
21             if segment.locator not in blocks:
22                 stream_tokens.append(segment.locator)
23                 blocks[segment.locator] = streamoffset
24                 streamoffset += segment.block_size
25
26     # Add the empty block if the stream is otherwise empty.
27     if len(stream_tokens) == 1:
28         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
29
30     for streamfile in sortedfiles:
31         # Add in file segments
32         current_span = None
33         fout = streamfile.replace(' ', '\\040')
34         for segment in stream[streamfile]:
35             # Collapse adjacent segments
36             streamoffset = blocks[segment.locator] + segment.segment_offset
37             if current_span is None:
38                 current_span = [streamoffset, streamoffset + segment.segment_size]
39             else:
40                 if streamoffset == current_span[1]:
41                     current_span[1] += segment.segment_size
42                 else:
43                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
44                     current_span = [streamoffset, streamoffset + segment.segment_size]
45
46         if current_span is not None:
47             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
48
49         if not stream[streamfile]:
50             stream_tokens.append("0:0:{0}".format(fout))
51
52     return stream_tokens