21720: added type assertions for AxiosInstance get
[arvados.git] / sdk / python / arvados / _normalize_stream.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import re
6
7 from . import config
8
9 def escape(path):
10     return re.sub(r'[\\:\000-\040]', lambda m: "\\%03o" % ord(m.group(0)), path)
11
12 def normalize_stream(stream_name, stream):
13     """Take manifest stream and return a list of tokens in normalized format.
14
15     :stream_name:
16       The name of the stream.
17
18     :stream:
19       A dict mapping each filename to a list of `_range.LocatorAndRange` objects.
20
21     """
22
23     stream_name = escape(stream_name)
24     stream_tokens = [stream_name]
25     sortedfiles = list(stream.keys())
26     sortedfiles.sort()
27
28     blocks = {}
29     streamoffset = 0
30     # Go through each file and add each referenced block exactly once.
31     for streamfile in sortedfiles:
32         for segment in stream[streamfile]:
33             if segment.locator not in blocks:
34                 stream_tokens.append(segment.locator)
35                 blocks[segment.locator] = streamoffset
36                 streamoffset += segment.block_size
37
38     # Add the empty block if the stream is otherwise empty.
39     if len(stream_tokens) == 1:
40         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
41
42     for streamfile in sortedfiles:
43         # Add in file segments
44         current_span = None
45         fout = escape(streamfile)
46         for segment in stream[streamfile]:
47             # Collapse adjacent segments
48             streamoffset = blocks[segment.locator] + segment.segment_offset
49             if current_span is None:
50                 current_span = [streamoffset, streamoffset + segment.segment_size]
51             else:
52                 if streamoffset == current_span[1]:
53                     current_span[1] += segment.segment_size
54                 else:
55                     stream_tokens.append(u"{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
56                     current_span = [streamoffset, streamoffset + segment.segment_size]
57
58         if current_span is not None:
59             stream_tokens.append(u"{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
60
61         if not stream[streamfile]:
62             stream_tokens.append(u"0:0:{0}".format(fout))
63
64     return stream_tokens