17706: Merge branch 'master' into 17706-costanalyzer-uncommitted-container-requests
[arvados.git] / sdk / python / arvados / _normalize_stream.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from . import config
7
8 import re
9
10 def escape(path):
11     path = re.sub('\\\\', lambda m: '\\134', path)
12     path = re.sub('[:\000-\040]', lambda m: "\\%03o" % ord(m.group(0)), path)
13     return path
14
15 def normalize_stream(stream_name, stream):
16     """Take manifest stream and return a list of tokens in normalized format.
17
18     :stream_name:
19       The name of the stream.
20
21     :stream:
22       A dict mapping each filename to a list of `_range.LocatorAndRange` objects.
23
24     """
25
26     stream_name = escape(stream_name)
27     stream_tokens = [stream_name]
28     sortedfiles = list(stream.keys())
29     sortedfiles.sort()
30
31     blocks = {}
32     streamoffset = 0
33     # Go through each file and add each referenced block exactly once.
34     for streamfile in sortedfiles:
35         for segment in stream[streamfile]:
36             if segment.locator not in blocks:
37                 stream_tokens.append(segment.locator)
38                 blocks[segment.locator] = streamoffset
39                 streamoffset += segment.block_size
40
41     # Add the empty block if the stream is otherwise empty.
42     if len(stream_tokens) == 1:
43         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
44
45     for streamfile in sortedfiles:
46         # Add in file segments
47         current_span = None
48         fout = escape(streamfile)
49         for segment in stream[streamfile]:
50             # Collapse adjacent segments
51             streamoffset = blocks[segment.locator] + segment.segment_offset
52             if current_span is None:
53                 current_span = [streamoffset, streamoffset + segment.segment_size]
54             else:
55                 if streamoffset == current_span[1]:
56                     current_span[1] += segment.segment_size
57                 else:
58                     stream_tokens.append(u"{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
59                     current_span = [streamoffset, streamoffset + segment.segment_size]
60
61         if current_span is not None:
62             stream_tokens.append(u"{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
63
64         if not stream[streamfile]:
65             stream_tokens.append(u"0:0:{0}".format(fout))
66
67     return stream_tokens