1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
10 _logger = logging.getLogger('arvados.streams')
12 # Log level below 'debug' !
16 __slots__ = ("locator", "range_start", "range_size", "segment_offset")
18 def __init__(self, locator, range_start, range_size, segment_offset=0):
19 self.locator = locator
20 self.range_start = range_start
21 self.range_size = range_size
22 self.segment_offset = segment_offset
25 return "Range(%r, %r, %r, %r)" % (self.locator, self.range_start, self.range_size, self.segment_offset)
27 def __eq__(self, other):
28 return (self.locator == other.locator and
29 self.range_start == other.range_start and
30 self.range_size == other.range_size and
31 self.segment_offset == other.segment_offset)
34 class LocatorAndRange:
35 __slots__ = ("locator", "block_size", "segment_offset", "segment_size")
37 def __init__(self, locator, block_size, segment_offset, segment_size):
38 self.locator = locator
39 self.block_size = block_size
40 self.segment_offset = segment_offset
41 self.segment_size = segment_size
43 def __eq__(self, other):
44 return (self.locator == other.locator and
45 self.block_size == other.block_size and
46 self.segment_offset == other.segment_offset and
47 self.segment_size == other.segment_size)
50 return "LocatorAndRange(%r, %r, %r, %r)" % (self.locator, self.block_size, self.segment_offset, self.segment_size)
53 def first_block(data_locators, range_start):
56 # range_start/block_start is the inclusive lower bound
57 # range_end/block_end is the exclusive upper bound
59 hi = len(data_locators)
62 block_size = data_locators[i].range_size
63 block_start = data_locators[i].range_start
64 block_end = block_start + block_size
66 # perform a binary search for the first block
67 # assumes that all of the blocks are contiguous, so range_start is guaranteed
68 # to either fall into the range of a block or be outside the block range entirely
69 while not (range_start >= block_start and range_start < block_end):
71 # must be out of range, fail
73 if range_start > block_start:
78 block_size = data_locators[i].range_size
79 block_start = data_locators[i].range_start
80 block_end = block_start + block_size
84 def locators_and_ranges(data_locators, range_start, range_size, limit=None):
85 """Get blocks that are covered by a range.
87 Returns a list of LocatorAndRange objects.
90 list of Range objects, assumes that blocks are in order and contiguous
99 Maximum segments to return, default None (unlimited). Will truncate the
100 result if there are more segments needed to cover the range than the
107 range_end = range_start + range_size
109 i = first_block(data_locators, range_start)
113 # We should always start at the first segment due to the binary
115 while i < len(data_locators) and len(resp) != limit:
116 dl = data_locators[i]
117 block_start = dl.range_start
118 block_size = dl.range_size
119 block_end = block_start + block_size
120 _logger.log(RANGES_SPAM,
121 "L&R %s range_start %s block_start %s range_end %s block_end %s",
122 dl.locator, range_start, block_start, range_end, block_end)
123 if range_end <= block_start:
124 # range ends before this block starts, so don't look at any more locators
127 if range_start >= block_start and range_end <= block_end:
128 # range starts and ends in this block
129 resp.append(LocatorAndRange(dl.locator, block_size, dl.segment_offset + (range_start - block_start), range_size))
130 elif range_start >= block_start and range_end > block_end:
131 # range starts in this block
132 resp.append(LocatorAndRange(dl.locator, block_size, dl.segment_offset + (range_start - block_start), block_end - range_start))
133 elif range_start < block_start and range_end > block_end:
134 # range starts in a previous block and extends to further blocks
135 resp.append(LocatorAndRange(dl.locator, block_size, dl.segment_offset, block_size))
136 elif range_start < block_start and range_end <= block_end:
137 # range starts in a previous block and ends in this block
138 resp.append(LocatorAndRange(dl.locator, block_size, dl.segment_offset, range_end - block_start))
139 block_start = block_end
143 def replace_range(data_locators, new_range_start, new_range_size, new_locator, new_segment_offset):
145 Replace a file segment range with a new segment.
148 data_locators will be updated in place
151 list of Range objects, assumes that segments are in order and contiguous
154 start of range to replace in data_locators
157 size of range to replace in data_locators
160 locator for new segment to be inserted
163 segment offset within the locator
166 if new_range_size == 0:
169 new_range_end = new_range_start + new_range_size
171 if len(data_locators) == 0:
172 data_locators.append(Range(new_locator, new_range_start, new_range_size, new_segment_offset))
175 last = data_locators[-1]
176 if (last.range_start+last.range_size) == new_range_start:
177 if last.locator == new_locator and (last.segment_offset+last.range_size) == new_segment_offset:
178 # extend last segment
179 last.range_size += new_range_size
181 data_locators.append(Range(new_locator, new_range_start, new_range_size, new_segment_offset))
184 i = first_block(data_locators, new_range_start)
188 # We should always start at the first segment due to the binary
190 while i < len(data_locators):
191 dl = data_locators[i]
192 old_segment_start = dl.range_start
193 old_segment_end = old_segment_start + dl.range_size
194 _logger.log(RANGES_SPAM,
195 "RR %s range_start %s segment_start %s range_end %s segment_end %s",
196 dl, new_range_start, old_segment_start, new_range_end,
198 if new_range_end <= old_segment_start:
199 # range ends before this segment starts, so don't look at any more locators
202 if old_segment_start <= new_range_start and new_range_end <= old_segment_end:
203 # new range starts and ends in old segment
204 # split segment into up to 3 pieces
205 if (new_range_start-old_segment_start) > 0:
206 data_locators[i] = Range(dl.locator, old_segment_start, (new_range_start-old_segment_start), dl.segment_offset)
207 data_locators.insert(i+1, Range(new_locator, new_range_start, new_range_size, new_segment_offset))
209 data_locators[i] = Range(new_locator, new_range_start, new_range_size, new_segment_offset)
211 if (old_segment_end-new_range_end) > 0:
212 data_locators.insert(i+2, Range(dl.locator, new_range_end, (old_segment_end-new_range_end), dl.segment_offset + (new_range_start-old_segment_start) + new_range_size))
214 elif old_segment_start <= new_range_start and new_range_end > old_segment_end:
215 # range starts in this segment
216 # split segment into 2 pieces
217 data_locators[i] = Range(dl.locator, old_segment_start, (new_range_start-old_segment_start), dl.segment_offset)
218 data_locators.insert(i+1, Range(new_locator, new_range_start, new_range_size, new_segment_offset))
220 elif new_range_start < old_segment_start and new_range_end >= old_segment_end:
221 # range starts in a previous segment and extends to further segments
222 # delete this segment
225 elif new_range_start < old_segment_start and new_range_end < old_segment_end:
226 # range starts in a previous segment and ends in this segment
227 # move the starting point of this segment up, and shrink it.
228 data_locators[i] = Range(dl.locator, new_range_end, (old_segment_end-new_range_end), dl.segment_offset + (new_range_end-old_segment_start))
233 return re.sub(r'[\\:\000-\040]', lambda m: "\\%03o" % ord(m.group(0)), path)
235 def normalize_stream(stream_name, stream):
236 """Take manifest stream and return a list of tokens in normalized format.
239 The name of the stream.
242 A dict mapping each filename to a list of `_range.LocatorAndRange` objects.
246 stream_name = escape(stream_name)
247 stream_tokens = [stream_name]
248 sortedfiles = list(stream.keys())
253 # Go through each file and add each referenced block exactly once.
254 for streamfile in sortedfiles:
255 for segment in stream[streamfile]:
256 if segment.locator not in blocks:
257 stream_tokens.append(segment.locator)
258 blocks[segment.locator] = streamoffset
259 streamoffset += segment.block_size
261 # Add the empty block if the stream is otherwise empty.
262 if len(stream_tokens) == 1:
263 stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
265 for streamfile in sortedfiles:
266 # Add in file segments
268 fout = escape(streamfile)
269 for segment in stream[streamfile]:
270 # Collapse adjacent segments
271 streamoffset = blocks[segment.locator] + segment.segment_offset
272 if current_span is None:
273 current_span = [streamoffset, streamoffset + segment.segment_size]
275 if streamoffset == current_span[1]:
276 current_span[1] += segment.segment_size
278 stream_tokens.append(u"{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
279 current_span = [streamoffset, streamoffset + segment.segment_size]
281 if current_span is not None:
282 stream_tokens.append(u"{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
284 if not stream[streamfile]:
285 stream_tokens.append(u"0:0:{0}".format(fout))