8937: Merge branch 'master' into 8937-arvput-cached-tokens
[arvados.git] / sdk / python / arvados / _normalize_stream.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from . import config
7
8 def normalize_stream(stream_name, stream):
9     """Take manifest stream and return a list of tokens in normalized format.
10
11     :stream_name:
12       The name of the stream.
13
14     :stream:
15       A dict mapping each filename to a list of `_range.LocatorAndRange` objects.
16
17     """
18
19     stream_name = stream_name.replace(' ', '\\040')
20     stream_tokens = [stream_name]
21     sortedfiles = list(stream.keys())
22     sortedfiles.sort()
23
24     blocks = {}
25     streamoffset = 0
26     # Go through each file and add each referenced block exactly once.
27     for streamfile in sortedfiles:
28         for segment in stream[streamfile]:
29             if segment.locator not in blocks:
30                 stream_tokens.append(segment.locator)
31                 blocks[segment.locator] = streamoffset
32                 streamoffset += segment.block_size
33
34     # Add the empty block if the stream is otherwise empty.
35     if len(stream_tokens) == 1:
36         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
37
38     for streamfile in sortedfiles:
39         # Add in file segments
40         current_span = None
41         fout = streamfile.replace(' ', '\\040')
42         for segment in stream[streamfile]:
43             # Collapse adjacent segments
44             streamoffset = blocks[segment.locator] + segment.segment_offset
45             if current_span is None:
46                 current_span = [streamoffset, streamoffset + segment.segment_size]
47             else:
48                 if streamoffset == current_span[1]:
49                     current_span[1] += segment.segment_size
50                 else:
51                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
52                     current_span = [streamoffset, streamoffset + segment.segment_size]
53
54         if current_span is not None:
55             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
56
57         if not stream[streamfile]:
58             stream_tokens.append("0:0:{0}".format(fout))
59
60     return stream_tokens