21811: Merge branch 'main' into 21811-side-favorites-test
[arvados.git] / sdk / python / arvados / _internal / streams.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import logging
6 import re
7
8 from .. import config
9
10 _logger = logging.getLogger('arvados.streams')
11
12 # Log level below 'debug' !
13 RANGES_SPAM = 9
14
15 class Range:
16     __slots__ = ("locator", "range_start", "range_size", "segment_offset")
17
18     def __init__(self, locator, range_start, range_size, segment_offset=0):
19         self.locator = locator
20         self.range_start = range_start
21         self.range_size = range_size
22         self.segment_offset = segment_offset
23
24     def __repr__(self):
25         return "Range(%r, %r, %r, %r)" % (self.locator, self.range_start, self.range_size, self.segment_offset)
26
27     def __eq__(self, other):
28         return (self.locator == other.locator and
29                 self.range_start == other.range_start and
30                 self.range_size == other.range_size and
31                 self.segment_offset == other.segment_offset)
32
33
34 class LocatorAndRange:
35     __slots__ = ("locator", "block_size", "segment_offset", "segment_size")
36
37     def __init__(self, locator, block_size, segment_offset, segment_size):
38         self.locator = locator
39         self.block_size = block_size
40         self.segment_offset = segment_offset
41         self.segment_size = segment_size
42
43     def __eq__(self, other):
44         return  (self.locator == other.locator and
45                  self.block_size == other.block_size and
46                  self.segment_offset == other.segment_offset and
47                  self.segment_size == other.segment_size)
48
49     def __repr__(self):
50         return "LocatorAndRange(%r, %r, %r, %r)" % (self.locator, self.block_size, self.segment_offset, self.segment_size)
51
52
53 def first_block(data_locators, range_start):
54     block_start = 0
55
56     # range_start/block_start is the inclusive lower bound
57     # range_end/block_end is the exclusive upper bound
58
59     hi = len(data_locators)
60     lo = 0
61     i = (hi + lo) // 2
62     block_size = data_locators[i].range_size
63     block_start = data_locators[i].range_start
64     block_end = block_start + block_size
65
66     # perform a binary search for the first block
67     # assumes that all of the blocks are contiguous, so range_start is guaranteed
68     # to either fall into the range of a block or be outside the block range entirely
69     while not (range_start >= block_start and range_start < block_end):
70         if lo == i:
71             # must be out of range, fail
72             return None
73         if range_start > block_start:
74             lo = i
75         else:
76             hi = i
77         i = (hi + lo) // 2
78         block_size = data_locators[i].range_size
79         block_start = data_locators[i].range_start
80         block_end = block_start + block_size
81
82     return i
83
84 def locators_and_ranges(data_locators, range_start, range_size, limit=None):
85     """Get blocks that are covered by a range.
86
87     Returns a list of LocatorAndRange objects.
88
89     :data_locators:
90       list of Range objects, assumes that blocks are in order and contiguous
91
92     :range_start:
93       start of range
94
95     :range_size:
96       size of range
97
98     :limit:
99       Maximum segments to return, default None (unlimited).  Will truncate the
100       result if there are more segments needed to cover the range than the
101       limit.
102
103     """
104     if range_size == 0:
105         return []
106     resp = []
107     range_end = range_start + range_size
108
109     i = first_block(data_locators, range_start)
110     if i is None:
111         return []
112
113     # We should always start at the first segment due to the binary
114     # search.
115     while i < len(data_locators) and len(resp) != limit:
116         dl = data_locators[i]
117         block_start = dl.range_start
118         block_size = dl.range_size
119         block_end = block_start + block_size
120         _logger.log(RANGES_SPAM,
121             "L&R %s range_start %s block_start %s range_end %s block_end %s",
122             dl.locator, range_start, block_start, range_end, block_end)
123         if range_end <= block_start:
124             # range ends before this block starts, so don't look at any more locators
125             break
126
127         if range_start >= block_start and range_end <= block_end:
128             # range starts and ends in this block
129             resp.append(LocatorAndRange(dl.locator, block_size, dl.segment_offset + (range_start - block_start), range_size))
130         elif range_start >= block_start and range_end > block_end:
131             # range starts in this block
132             resp.append(LocatorAndRange(dl.locator, block_size, dl.segment_offset + (range_start - block_start), block_end - range_start))
133         elif range_start < block_start and range_end > block_end:
134             # range starts in a previous block and extends to further blocks
135             resp.append(LocatorAndRange(dl.locator, block_size, dl.segment_offset, block_size))
136         elif range_start < block_start and range_end <= block_end:
137             # range starts in a previous block and ends in this block
138             resp.append(LocatorAndRange(dl.locator, block_size, dl.segment_offset, range_end - block_start))
139         block_start = block_end
140         i += 1
141     return resp
142
143 def replace_range(data_locators, new_range_start, new_range_size, new_locator, new_segment_offset):
144     """
145     Replace a file segment range with a new segment.
146
147     NOTE::
148     data_locators will be updated in place
149
150     :data_locators:
151       list of Range objects, assumes that segments are in order and contiguous
152
153     :new_range_start:
154       start of range to replace in data_locators
155
156     :new_range_size:
157       size of range to replace in data_locators
158
159     :new_locator:
160       locator for new segment to be inserted
161
162     :new_segment_offset:
163       segment offset within the locator
164
165     """
166     if new_range_size == 0:
167         return
168
169     new_range_end = new_range_start + new_range_size
170
171     if len(data_locators) == 0:
172         data_locators.append(Range(new_locator, new_range_start, new_range_size, new_segment_offset))
173         return
174
175     last = data_locators[-1]
176     if (last.range_start+last.range_size) == new_range_start:
177         if last.locator == new_locator and (last.segment_offset+last.range_size) == new_segment_offset:
178             # extend last segment
179             last.range_size += new_range_size
180         else:
181             data_locators.append(Range(new_locator, new_range_start, new_range_size, new_segment_offset))
182         return
183
184     i = first_block(data_locators, new_range_start)
185     if i is None:
186         return
187
188     # We should always start at the first segment due to the binary
189     # search.
190     while i < len(data_locators):
191         dl = data_locators[i]
192         old_segment_start = dl.range_start
193         old_segment_end = old_segment_start + dl.range_size
194         _logger.log(RANGES_SPAM,
195             "RR %s range_start %s segment_start %s range_end %s segment_end %s",
196             dl, new_range_start, old_segment_start, new_range_end,
197             old_segment_end)
198         if new_range_end <= old_segment_start:
199             # range ends before this segment starts, so don't look at any more locators
200             break
201
202         if old_segment_start <= new_range_start and new_range_end <= old_segment_end:
203             # new range starts and ends in old segment
204             # split segment into up to 3 pieces
205             if (new_range_start-old_segment_start) > 0:
206                 data_locators[i] = Range(dl.locator, old_segment_start, (new_range_start-old_segment_start), dl.segment_offset)
207                 data_locators.insert(i+1, Range(new_locator, new_range_start, new_range_size, new_segment_offset))
208             else:
209                 data_locators[i] = Range(new_locator, new_range_start, new_range_size, new_segment_offset)
210                 i -= 1
211             if (old_segment_end-new_range_end) > 0:
212                 data_locators.insert(i+2, Range(dl.locator, new_range_end, (old_segment_end-new_range_end), dl.segment_offset + (new_range_start-old_segment_start) + new_range_size))
213             return
214         elif old_segment_start <= new_range_start and new_range_end > old_segment_end:
215             # range starts in this segment
216             # split segment into 2 pieces
217             data_locators[i] = Range(dl.locator, old_segment_start, (new_range_start-old_segment_start), dl.segment_offset)
218             data_locators.insert(i+1, Range(new_locator, new_range_start, new_range_size, new_segment_offset))
219             i += 1
220         elif new_range_start < old_segment_start and new_range_end >= old_segment_end:
221             # range starts in a previous segment and extends to further segments
222             # delete this segment
223             del data_locators[i]
224             i -= 1
225         elif new_range_start < old_segment_start and new_range_end < old_segment_end:
226             # range starts in a previous segment and ends in this segment
227             # move the starting point of this segment up, and shrink it.
228             data_locators[i] = Range(dl.locator, new_range_end, (old_segment_end-new_range_end), dl.segment_offset + (new_range_end-old_segment_start))
229             return
230         i += 1
231
232 def escape(path):
233     return re.sub(r'[\\:\000-\040]', lambda m: "\\%03o" % ord(m.group(0)), path)
234
235 def normalize_stream(stream_name, stream):
236     """Take manifest stream and return a list of tokens in normalized format.
237
238     :stream_name:
239       The name of the stream.
240
241     :stream:
242       A dict mapping each filename to a list of `_range.LocatorAndRange` objects.
243
244     """
245
246     stream_name = escape(stream_name)
247     stream_tokens = [stream_name]
248     sortedfiles = list(stream.keys())
249     sortedfiles.sort()
250
251     blocks = {}
252     streamoffset = 0
253     # Go through each file and add each referenced block exactly once.
254     for streamfile in sortedfiles:
255         for segment in stream[streamfile]:
256             if segment.locator not in blocks:
257                 stream_tokens.append(segment.locator)
258                 blocks[segment.locator] = streamoffset
259                 streamoffset += segment.block_size
260
261     # Add the empty block if the stream is otherwise empty.
262     if len(stream_tokens) == 1:
263         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
264
265     for streamfile in sortedfiles:
266         # Add in file segments
267         current_span = None
268         fout = escape(streamfile)
269         for segment in stream[streamfile]:
270             # Collapse adjacent segments
271             streamoffset = blocks[segment.locator] + segment.segment_offset
272             if current_span is None:
273                 current_span = [streamoffset, streamoffset + segment.segment_size]
274             else:
275                 if streamoffset == current_span[1]:
276                     current_span[1] += segment.segment_size
277                 else:
278                     stream_tokens.append(u"{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
279                     current_span = [streamoffset, streamoffset + segment.segment_size]
280
281         if current_span is not None:
282             stream_tokens.append(u"{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
283
284         if not stream[streamfile]:
285             stream_tokens.append(u"0:0:{0}".format(fout))
286
287     return stream_tokens