3198: Support repacking buffer blocks when writes are superceded.
[arvados.git] / sdk / python / arvados / stream.py
1 import collections
2 import hashlib
3 import os
4 import re
5 import threading
6 import functools
7 import copy
8
9 from .ranges import *
10 from .arvfile import ArvadosFileBase, StreamFileReader, StreamFileWriter
11 from arvados.retry import retry_method
12 from keep import *
13 import config
14 import errors
15
16 def normalize_stream(s, stream):
17     '''
18     s is the stream name
19     stream is a StreamReader object
20     '''
21     stream_tokens = [s]
22     sortedfiles = list(stream.keys())
23     sortedfiles.sort()
24
25     blocks = {}
26     streamoffset = 0L
27     for f in sortedfiles:
28         for b in stream[f]:
29             if b[arvados.LOCATOR] not in blocks:
30                 stream_tokens.append(b[arvados.LOCATOR])
31                 blocks[b[arvados.LOCATOR]] = streamoffset
32                 streamoffset += b[arvados.BLOCKSIZE]
33
34     if len(stream_tokens) == 1:
35         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
36
37     for f in sortedfiles:
38         current_span = None
39         fout = f.replace(' ', '\\040')
40         for segment in stream[f]:
41             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
42             if current_span is None:
43                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
44             else:
45                 if segmentoffset == current_span[1]:
46                     current_span[1] += segment[arvados.SEGMENTSIZE]
47                 else:
48                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
49                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
50
51         if current_span is not None:
52             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
53
54         if not stream[f]:
55             stream_tokens.append("0:0:{0}".format(fout))
56
57     return stream_tokens
58
59
60 class StreamReader(object):
61     def __init__(self, tokens, keep=None, debug=False, _empty=False,
62                  num_retries=0):
63         self._stream_name = None
64         self._data_locators = []
65         self._files = collections.OrderedDict()
66         self._keep = keep
67         self.num_retries = num_retries
68
69         streamoffset = 0L
70
71         # parse stream
72         for tok in tokens:
73             if debug: print 'tok', tok
74             if self._stream_name is None:
75                 self._stream_name = tok.replace('\\040', ' ')
76                 continue
77
78             s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
79             if s:
80                 blocksize = long(s.group(1))
81                 self._data_locators.append([tok, blocksize, streamoffset])
82                 streamoffset += blocksize
83                 continue
84
85             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
86             if s:
87                 pos = long(s.group(1))
88                 size = long(s.group(2))
89                 name = s.group(3).replace('\\040', ' ')
90                 if name not in self._files:
91                     self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
92                 else:
93                     filereader = self._files[name]
94                     filereader.segments.append([pos, size, filereader.size()])
95                 continue
96
97             raise errors.SyntaxError("Invalid manifest format")
98
99     def name(self):
100         return self._stream_name
101
102     def files(self):
103         return self._files
104
105     def all_files(self):
106         return self._files.values()
107
108     def _size(self):
109         n = self._data_locators[-1]
110         return n[OFFSET] + n[BLOCKSIZE]
111
112     def size(self):
113         return self._size()
114
115     def locators_and_ranges(self, range_start, range_size):
116         return locators_and_ranges(self._data_locators, range_start, range_size)
117
118     @retry_method
119     def _keepget(self, locator, num_retries=None):
120         return self._keep.get(locator, num_retries=num_retries)
121
122     @retry_method
123     def readfrom(self, start, size, num_retries=None):
124         return self._readfrom(start, size, num_retries=num_retries)
125
126     @retry_method
127     def _readfrom(self, start, size, num_retries=None):
128         """Read up to 'size' bytes from the stream, starting at 'start'"""
129         if size == 0:
130             return ''
131         if self._keep is None:
132             self._keep = KeepClient(num_retries=self.num_retries)
133         data = []
134         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
135             data.append(self._keepget(locator, num_retries=num_retries)[segmentoffset:segmentoffset+segmentsize])
136         return ''.join(data)
137
138     def manifest_text(self, strip=False):
139         manifest_text = [self.name().replace(' ', '\\040')]
140         if strip:
141             for d in self._data_locators:
142                 m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR])
143                 manifest_text.append(m.group(0))
144         else:
145             manifest_text.extend([d[LOCATOR] for d in self._data_locators])
146         manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name.replace(' ', '\\040'))
147                                         for seg in f.segments])
148                               for f in self._files.values()])
149         return ' '.join(manifest_text) + '\n'
150
151
152 class BufferBlock(object):
153     def __init__(self, locator, streamoffset, starting_size=2**16):
154         self.locator = locator
155         self.buffer_block = bytearray(starting_size)
156         self.buffer_view = memoryview(self.buffer_block)
157         self.write_pointer = 0
158         self.locator_list_entry = [locator, 0, streamoffset]
159
160     def append(self, data):
161         while (self.write_pointer+len(data)) > len(self.buffer_block):
162             new_buffer_block = bytearray(len(self.buffer_block) * 2)
163             new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
164             self.buffer_block = new_buffer_block
165             self.buffer_view = memoryview(self.buffer_block)
166         self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
167         self.write_pointer += len(data)
168         self.locator_list_entry[1] = self.write_pointer
169
170
171 class StreamWriter(StreamReader):
172     def __init__(self, tokens, keep=None, debug=False, _empty=False,
173                  num_retries=0):
174         super(StreamWriter, self).__init__(tokens, keep, debug, _empty, num_retries)
175
176         if len(self._files) != 1:
177             raise AssertionError("StreamWriter can only have one file at a time")
178         sr = self._files.popitem()[1]
179         self._files[sr.name] = StreamFileWriter(self, sr.segments, sr.name)
180
181         self.mutex = threading.Lock()
182         self.current_bblock = None
183         self.bufferblocks = {}
184
185     # wrap superclass methods in mutex
186     def _proxy_method(name):
187         method = getattr(StreamReader, name)
188         @functools.wraps(method, ('__name__', '__doc__'))
189         def wrapper(self, *args, **kwargs):
190             with self.mutex:
191                 return method(self, *args, **kwargs)
192         return wrapper
193
194     for _method_name in ['files', 'all_files', 'size', 'locators_and_ranges', 'readfrom', 'manifest_text']:
195         locals()[_method_name] = _proxy_method(_method_name)
196
197     @retry_method
198     def _keepget(self, locator, num_retries=None):
199         if locator in self.bufferblocks:
200             bb = self.bufferblocks[locator]
201             return str(bb.buffer_block[0:bb.write_pointer])
202         else:
203             return self._keep.get(locator, num_retries=num_retries)
204
205     def _init_bufferblock(self):
206         last = self._data_locators[-1]
207         streamoffset = last[OFFSET] + last[BLOCKSIZE]
208         if last[BLOCKSIZE] == 0:
209             del self._data_locators[-1]
210         self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset)
211         self.bufferblocks[self.current_bblock.locator] = self.current_bblock
212         self._data_locators.append(self.current_bblock.locator_list_entry)
213
214     def _repack_writes(self):
215         '''Test if the buffer block has more data than is referenced by actual segments
216         (this happens when a buffered write over-writes a file range written in
217         a previous buffered write).  Re-pack the buffer block for efficiency
218         and to avoid leaking information.
219         '''
220         segs = self._files.values()[0].segments
221
222         bufferblock_segs = []
223         i = 0
224         tmp_segs = copy.copy(segs)
225         while i < len(tmp_segs):
226             # Go through each segment and identify segments that include the buffer block
227             s = tmp_segs[i]
228             if s[LOCATOR] < self.current_bblock.locator_list_entry[OFFSET] and (s[LOCATOR] + s[BLOCKSIZE]) > self.current_bblock.locator_list_entry[OFFSET]:
229                 # The segment straddles the previous block and the current buffer block.  Split the segment.
230                 b1 = self.current_bblock.locator_list_entry[OFFSET] - s[LOCATOR]
231                 b2 = (s[LOCATOR] + s[BLOCKSIZE]) - self.current_bblock.locator_list_entry[OFFSET]
232                 bb_seg = [self.current_bblock.locator_list_entry[OFFSET], b2, s[OFFSET]+b1]
233                 tmp_segs[i] = [s[LOCATOR], b1, s[OFFSET]]
234                 tmp_segs.insert(i+1, bb_seg)
235                 bufferblock_segs.append(bb_seg)
236                 i += 1
237             elif s[LOCATOR] >= self.current_bblock.locator_list_entry[OFFSET]:
238                 # The segment's data is in the buffer block.
239                 bufferblock_segs.append(s)
240             i += 1
241
242         # Now sum up the segments to get the total bytes
243         # of the file referencing into the buffer block.
244         write_total = sum([s[BLOCKSIZE] for s in bufferblock_segs])
245
246         if write_total < self.current_bblock.locator_list_entry[BLOCKSIZE]:
247             # There is more data in the buffer block than is actually accounted for by segments, so
248             # re-pack into a new buffer by copying over to a new buffer block.
249             new_bb = BufferBlock(self.current_bblock.locator,
250                                  self.current_bblock.locator_list_entry[OFFSET],
251                                  starting_size=write_total)
252             for t in bufferblock_segs:
253                 t_start = t[LOCATOR] - self.current_bblock.locator_list_entry[OFFSET]
254                 t_end = t_start + t[BLOCKSIZE]
255                 t[0] = self.current_bblock.locator_list_entry[OFFSET] + new_bb.write_pointer
256                 new_bb.append(self.current_bblock.buffer_block[t_start:t_end])
257
258             self.current_bblock = new_bb
259             self.bufferblocks[self.current_bblock.locator] = self.current_bblock
260             self._data_locators[-1] = self.current_bblock.locator_list_entry
261             self._files.values()[0].segments = tmp_segs
262
263     def _commit(self):
264         # commit buffer block
265
266         # TODO: do 'put' in the background?
267         pdh = self._keep.put(self.current_bblock.buffer_block[0:self.current_bblock.write_pointer])
268         self._data_locators[-1][0] = pdh
269         self.current_bblock = None
270
271     def commit(self):
272         with self.mutex:
273             self._repack_writes()
274             self._commit()
275
276     def _append(self, data):
277         if len(data) > config.KEEP_BLOCK_SIZE:
278             raise ArgumentError("Please append data chunks smaller than config.KEEP_BLOCK_SIZE")
279
280         if self.current_bblock is None:
281             self._init_bufferblock()
282
283         if (self.current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
284             self._repack_writes()
285             if (self.current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
286                 self._commit()
287                 self._init_bufferblock()
288
289         self.current_bblock.append(data)
290
291     def append(self, data):
292         with self.mutex:
293             self._append(data)