10 from .arvfile import ArvadosFileBase, StreamFileReader, StreamFileWriter
11 from arvados.retry import retry_method
16 def normalize_stream(s, stream):
19 stream is a StreamReader object
22 sortedfiles = list(stream.keys())
29 if b[arvados.LOCATOR] not in blocks:
30 stream_tokens.append(b[arvados.LOCATOR])
31 blocks[b[arvados.LOCATOR]] = streamoffset
32 streamoffset += b[arvados.BLOCKSIZE]
34 if len(stream_tokens) == 1:
35 stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
39 fout = f.replace(' ', '\\040')
40 for segment in stream[f]:
41 segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
42 if current_span is None:
43 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
45 if segmentoffset == current_span[1]:
46 current_span[1] += segment[arvados.SEGMENTSIZE]
48 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
49 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
51 if current_span is not None:
52 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
55 stream_tokens.append("0:0:{0}".format(fout))
60 class StreamReader(object):
61 def __init__(self, tokens, keep=None, debug=False, _empty=False,
63 self._stream_name = None
64 self._data_locators = []
65 self._files = collections.OrderedDict()
67 self.num_retries = num_retries
73 if debug: print 'tok', tok
74 if self._stream_name is None:
75 self._stream_name = tok.replace('\\040', ' ')
78 s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
80 blocksize = long(s.group(1))
81 self._data_locators.append([tok, blocksize, streamoffset])
82 streamoffset += blocksize
85 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
87 pos = long(s.group(1))
88 size = long(s.group(2))
89 name = s.group(3).replace('\\040', ' ')
90 if name not in self._files:
91 self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
93 filereader = self._files[name]
94 filereader.segments.append([pos, size, filereader.size()])
97 raise errors.SyntaxError("Invalid manifest format")
100 return self._stream_name
106 return self._files.values()
109 n = self._data_locators[-1]
110 return n[OFFSET] + n[BLOCKSIZE]
115 def locators_and_ranges(self, range_start, range_size):
116 return locators_and_ranges(self._data_locators, range_start, range_size)
119 def _keepget(self, locator, num_retries=None):
120 return self._keep.get(locator, num_retries=num_retries)
123 def readfrom(self, start, size, num_retries=None):
124 return self._readfrom(start, size, num_retries=num_retries)
127 def _readfrom(self, start, size, num_retries=None):
128 """Read up to 'size' bytes from the stream, starting at 'start'"""
131 if self._keep is None:
132 self._keep = KeepClient(num_retries=self.num_retries)
134 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
135 data.append(self._keepget(locator, num_retries=num_retries)[segmentoffset:segmentoffset+segmentsize])
138 def manifest_text(self, strip=False):
139 manifest_text = [self.name().replace(' ', '\\040')]
141 for d in self._data_locators:
142 m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR])
143 manifest_text.append(m.group(0))
145 manifest_text.extend([d[LOCATOR] for d in self._data_locators])
146 manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name.replace(' ', '\\040'))
147 for seg in f.segments])
148 for f in self._files.values()])
149 return ' '.join(manifest_text) + '\n'
152 class BufferBlock(object):
153 def __init__(self, locator, streamoffset, starting_size=2**16):
154 self.locator = locator
155 self.buffer_block = bytearray(starting_size)
156 self.buffer_view = memoryview(self.buffer_block)
157 self.write_pointer = 0
158 self.locator_list_entry = [locator, 0, streamoffset]
160 def append(self, data):
161 while (self.write_pointer+len(data)) > len(self.buffer_block):
162 new_buffer_block = bytearray(len(self.buffer_block) * 2)
163 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
164 self.buffer_block = new_buffer_block
165 self.buffer_view = memoryview(self.buffer_block)
166 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
167 self.write_pointer += len(data)
168 self.locator_list_entry[1] = self.write_pointer
171 class StreamWriter(StreamReader):
172 def __init__(self, tokens, keep=None, debug=False, _empty=False,
174 super(StreamWriter, self).__init__(tokens, keep, debug, _empty, num_retries)
176 if len(self._files) != 1:
177 raise AssertionError("StreamWriter can only have one file at a time")
178 sr = self._files.popitem()[1]
179 self._files[sr.name] = StreamFileWriter(self, sr.segments, sr.name)
181 self.mutex = threading.Lock()
182 self.current_bblock = None
183 self.bufferblocks = {}
185 # wrap superclass methods in mutex
186 def _proxy_method(name):
187 method = getattr(StreamReader, name)
188 @functools.wraps(method, ('__name__', '__doc__'))
189 def wrapper(self, *args, **kwargs):
191 return method(self, *args, **kwargs)
194 for _method_name in ['files', 'all_files', 'size', 'locators_and_ranges', 'readfrom', 'manifest_text']:
195 locals()[_method_name] = _proxy_method(_method_name)
198 def _keepget(self, locator, num_retries=None):
199 if locator in self.bufferblocks:
200 bb = self.bufferblocks[locator]
201 return str(bb.buffer_block[0:bb.write_pointer])
203 return self._keep.get(locator, num_retries=num_retries)
205 def _init_bufferblock(self):
206 last = self._data_locators[-1]
207 streamoffset = last[OFFSET] + last[BLOCKSIZE]
208 if last[BLOCKSIZE] == 0:
209 del self._data_locators[-1]
210 self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset)
211 self.bufferblocks[self.current_bblock.locator] = self.current_bblock
212 self._data_locators.append(self.current_bblock.locator_list_entry)
214 def _repack_writes(self):
215 '''Test if the buffer block has more data than is referenced by actual segments
216 (this happens when a buffered write over-writes a file range written in
217 a previous buffered write). Re-pack the buffer block for efficiency
218 and to avoid leaking information.
220 segs = self._files.values()[0].segments
222 bufferblock_segs = []
224 tmp_segs = copy.copy(segs)
225 while i < len(tmp_segs):
226 # Go through each segment and identify segments that include the buffer block
228 if s[LOCATOR] < self.current_bblock.locator_list_entry[OFFSET] and (s[LOCATOR] + s[BLOCKSIZE]) > self.current_bblock.locator_list_entry[OFFSET]:
229 # The segment straddles the previous block and the current buffer block. Split the segment.
230 b1 = self.current_bblock.locator_list_entry[OFFSET] - s[LOCATOR]
231 b2 = (s[LOCATOR] + s[BLOCKSIZE]) - self.current_bblock.locator_list_entry[OFFSET]
232 bb_seg = [self.current_bblock.locator_list_entry[OFFSET], b2, s[OFFSET]+b1]
233 tmp_segs[i] = [s[LOCATOR], b1, s[OFFSET]]
234 tmp_segs.insert(i+1, bb_seg)
235 bufferblock_segs.append(bb_seg)
237 elif s[LOCATOR] >= self.current_bblock.locator_list_entry[OFFSET]:
238 # The segment's data is in the buffer block.
239 bufferblock_segs.append(s)
242 # Now sum up the segments to get the total bytes
243 # of the file referencing into the buffer block.
244 write_total = sum([s[BLOCKSIZE] for s in bufferblock_segs])
246 if write_total < self.current_bblock.locator_list_entry[BLOCKSIZE]:
247 # There is more data in the buffer block than is actually accounted for by segments, so
248 # re-pack into a new buffer by copying over to a new buffer block.
249 new_bb = BufferBlock(self.current_bblock.locator,
250 self.current_bblock.locator_list_entry[OFFSET],
251 starting_size=write_total)
252 for t in bufferblock_segs:
253 t_start = t[LOCATOR] - self.current_bblock.locator_list_entry[OFFSET]
254 t_end = t_start + t[BLOCKSIZE]
255 t[0] = self.current_bblock.locator_list_entry[OFFSET] + new_bb.write_pointer
256 new_bb.append(self.current_bblock.buffer_block[t_start:t_end])
258 self.current_bblock = new_bb
259 self.bufferblocks[self.current_bblock.locator] = self.current_bblock
260 self._data_locators[-1] = self.current_bblock.locator_list_entry
261 self._files.values()[0].segments = tmp_segs
264 # commit buffer block
266 # TODO: do 'put' in the background?
267 pdh = self._keep.put(self.current_bblock.buffer_block[0:self.current_bblock.write_pointer])
268 self._data_locators[-1][0] = pdh
269 self.current_bblock = None
273 self._repack_writes()
276 def _append(self, data):
277 if len(data) > config.KEEP_BLOCK_SIZE:
278 raise ArgumentError("Please append data chunks smaller than config.KEEP_BLOCK_SIZE")
280 if self.current_bblock is None:
281 self._init_bufferblock()
283 if (self.current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
284 self._repack_writes()
285 if (self.current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
287 self._init_bufferblock()
289 self.current_bblock.append(data)
291 def append(self, data):