10315: Added back the BlockManager's put threads lazy start, but with a specific...
[arvados.git] / sdk / python / arvados / _normalize_stream.py
1 import config
2
3 def normalize_stream(stream_name, stream):
4     """Take manifest stream and return a list of tokens in normalized format.
5
6     :stream_name:
7       The name of the stream.
8
9     :stream:
10       A dict mapping each filename to a list of `_range.LocatorAndRange` objects.
11
12     """
13
14     stream_name = stream_name.replace(' ', '\\040')
15     stream_tokens = [stream_name]
16     sortedfiles = list(stream.keys())
17     sortedfiles.sort()
18
19     blocks = {}
20     streamoffset = 0L
21     # Go through each file and add each referenced block exactly once.
22     for streamfile in sortedfiles:
23         for segment in stream[streamfile]:
24             if segment.locator not in blocks:
25                 stream_tokens.append(segment.locator)
26                 blocks[segment.locator] = streamoffset
27                 streamoffset += segment.block_size
28
29     # Add the empty block if the stream is otherwise empty.
30     if len(stream_tokens) == 1:
31         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
32
33     for streamfile in sortedfiles:
34         # Add in file segments
35         current_span = None
36         fout = streamfile.replace(' ', '\\040')
37         for segment in stream[streamfile]:
38             # Collapse adjacent segments
39             streamoffset = blocks[segment.locator] + segment.segment_offset
40             if current_span is None:
41                 current_span = [streamoffset, streamoffset + segment.segment_size]
42             else:
43                 if streamoffset == current_span[1]:
44                     current_span[1] += segment.segment_size
45                 else:
46                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
47                     current_span = [streamoffset, streamoffset + segment.segment_size]
48
49         if current_span is not None:
50             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
51
52         if not stream[streamfile]:
53             stream_tokens.append("0:0:{0}".format(fout))
54
55     return stream_tokens