3198: Fixing tests broken by previous commit's refactoring. Creating new
[arvados.git] / sdk / python / arvados / stream.py
1 import collections
2 import hashlib
3 import os
4 import re
5 import threading
6 import functools
7 import copy
8
9 from .ranges import *
10 from .arvfile import ArvadosFileBase, StreamFileReader
11 from arvados.retry import retry_method
12 from keep import *
13 import config
14 import errors
15
16 def normalize_stream(s, stream):
17     '''
18     s is the stream name
19     stream is a dict mapping each filename to a list in the form [block locator, block size, segment offset (from beginning of block), segment size]
20     returns the stream as a list of tokens
21     '''
22     stream_tokens = [s]
23     sortedfiles = list(stream.keys())
24     sortedfiles.sort()
25
26     blocks = {}
27     streamoffset = 0L
28     # Go through each file and add each referenced block exactly once.
29     for f in sortedfiles:
30         for b in stream[f]:
31             if b.locator not in blocks:
32                 stream_tokens.append(b.locator)
33                 blocks[b.locator] = streamoffset
34                 streamoffset += b.block_size
35
36     # Add the empty block if the stream is otherwise empty.
37     if len(stream_tokens) == 1:
38         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
39
40     for f in sortedfiles:
41         # Add in file segments
42         current_span = None
43         fout = f.replace(' ', '\\040')
44         for segment in stream[f]:
45             # Collapse adjacent segments
46             streamoffset = blocks[segment.locator] + segment.segment_offset
47             if current_span is None:
48                 current_span = [streamoffset, streamoffset + segment.segment_size]
49             else:
50                 if streamoffset == current_span[1]:
51                     current_span[1] += segment.segment_size
52                 else:
53                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
54                     current_span = [streamoffset, streamoffset + segment.segment_size]
55
56         if current_span is not None:
57             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
58
59         if not stream[f]:
60             stream_tokens.append("0:0:{0}".format(fout))
61
62     return stream_tokens
63
64
65 class StreamReader(object):
66     def __init__(self, tokens, keep=None, debug=False, _empty=False,
67                  num_retries=0):
68         self._stream_name = None
69         self._data_locators = []
70         self._files = collections.OrderedDict()
71         self._keep = keep
72         self.num_retries = num_retries
73
74         streamoffset = 0L
75
76         # parse stream
77         for tok in tokens:
78             if debug: print 'tok', tok
79             if self._stream_name is None:
80                 self._stream_name = tok.replace('\\040', ' ')
81                 continue
82
83             s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
84             if s:
85                 blocksize = long(s.group(1))
86                 self._data_locators.append(Range(tok, streamoffset, blocksize))
87                 streamoffset += blocksize
88                 continue
89
90             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
91             if s:
92                 pos = long(s.group(1))
93                 size = long(s.group(2))
94                 name = s.group(3).replace('\\040', ' ')
95                 if name not in self._files:
96                     self._files[name] = StreamFileReader(self, [Range(pos, 0, size)], name)
97                 else:
98                     filereader = self._files[name]
99                     filereader.segments.append(Range(pos, filereader.size(), size))
100                 continue
101
102             raise errors.SyntaxError("Invalid manifest format")
103
104     def name(self):
105         return self._stream_name
106
107     def files(self):
108         return self._files
109
110     def all_files(self):
111         return self._files.values()
112
113     def _size(self):
114         n = self._data_locators[-1]
115         return n.range_start + n.range_size
116
117     def size(self):
118         return self._size()
119
120     def locators_and_ranges(self, range_start, range_size):
121         return locators_and_ranges(self._data_locators, range_start, range_size)
122
123     @retry_method
124     def _keepget(self, locator, num_retries=None):
125         return self._keep.get(locator, num_retries=num_retries)
126
127     @retry_method
128     def readfrom(self, start, size, num_retries=None):
129         return self._readfrom(start, size, num_retries=num_retries)
130
131     @retry_method
132     def _readfrom(self, start, size, num_retries=None):
133         """Read up to 'size' bytes from the stream, starting at 'start'"""
134         if size == 0:
135             return ''
136         if self._keep is None:
137             self._keep = KeepClient(num_retries=self.num_retries)
138         data = []
139         for lr in locators_and_ranges(self._data_locators, start, size):
140             data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
141         return ''.join(data)
142
143     def manifest_text(self, strip=False):
144         manifest_text = [self.name().replace(' ', '\\040')]
145         if strip:
146             for d in self._data_locators:
147                 m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator)
148                 manifest_text.append(m.group(0))
149         else:
150             manifest_text.extend([d.locator for d in self._data_locators])
151         manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040'))
152                                         for seg in f.segments])
153                               for f in self._files.values()])
154         return ' '.join(manifest_text) + '\n'
155
156
157 class BufferBlock(object):
158     def __init__(self, locator, streamoffset, starting_size=2**16):
159         self.locator = locator
160         self.buffer_block = bytearray(starting_size)
161         self.buffer_view = memoryview(self.buffer_block)
162         self.write_pointer = 0
163         self.locator_list_entry = [locator, 0, streamoffset]
164
165     def append(self, data):
166         while (self.write_pointer+len(data)) > len(self.buffer_block):
167             new_buffer_block = bytearray(len(self.buffer_block) * 2)
168             new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
169             self.buffer_block = new_buffer_block
170             self.buffer_view = memoryview(self.buffer_block)
171         self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
172         self.write_pointer += len(data)
173         self.locator_list_entry[1] = self.write_pointer
174
175
176 # class StreamWriter(StreamReader):
177 #     def __init__(self, tokens, keep=None, debug=False, _empty=False,
178 #                  num_retries=0):
179 #         super(StreamWriter, self).__init__(tokens, keep, debug, _empty, num_retries)
180
181 #         if len(self._files) != 1:
182 #             raise AssertionError("StreamWriter can only have one file at a time")
183 #         sr = self._files.popitem()[1]
184 #         self._files[sr.name] = StreamFileWriter(self, sr.segments, sr.name)
185
186 #         self.mutex = threading.Lock()
187 #         self.current_bblock = None
188 #         self.bufferblocks = {}
189
190 #     # wrap superclass methods in mutex
191 #     def _proxy_method(name):
192 #         method = getattr(StreamReader, name)
193 #         @functools.wraps(method, ('__name__', '__doc__'))
194 #         def wrapper(self, *args, **kwargs):
195 #             with self.mutex:
196 #                 return method(self, *args, **kwargs)
197 #         return wrapper
198
199 #     for _method_name in ['files', 'all_files', 'size', 'locators_and_ranges', 'readfrom', 'manifest_text']:
200 #         locals()[_method_name] = _proxy_method(_method_name)
201
202 #     @retry_method
203 #     def _keepget(self, locator, num_retries=None):
204 #         if locator in self.bufferblocks:
205 #             bb = self.bufferblocks[locator]
206 #             return str(bb.buffer_block[0:bb.write_pointer])
207 #         else:
208 #             return self._keep.get(locator, num_retries=num_retries)
209
210 #     def _init_bufferblock(self):
211 #         last = self._data_locators[-1]
212 #         streamoffset = last.range_start + last.range_size
213 #         if last.range_size == 0:
214 #             del self._data_locators[-1]
215 #         self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset)
216 #         self.bufferblocks[self.current_bblock.locator] = self.current_bblock
217 #         self._data_locators.append(self.current_bblock.locator_list_entry)
218
219 #     def _repack_writes(self):
220 #         '''Test if the buffer block has more data than is referenced by actual segments
221 #         (this happens when a buffered write over-writes a file range written in
222 #         a previous buffered write).  Re-pack the buffer block for efficiency
223 #         and to avoid leaking information.
224 #         '''
225 #         segs = self._files.values()[0].segments
226
227 #         bufferblock_segs = []
228 #         i = 0
229 #         tmp_segs = copy.copy(segs)
230 #         while i < len(tmp_segs):
231 #             # Go through each segment and identify segments that include the buffer block
232 #             s = tmp_segs[i]
233 #             if s[LOCATOR] < self.current_bblock.locator_list_entry.range_start and (s[LOCATOR] + s.range_size) > self.current_bblock.locator_list_entry.range_start:
234 #                 # The segment straddles the previous block and the current buffer block.  Split the segment.
235 #                 b1 = self.current_bblock.locator_list_entry.range_start - s[LOCATOR]
236 #                 b2 = (s[LOCATOR] + s.range_size) - self.current_bblock.locator_list_entry.range_start
237 #                 bb_seg = [self.current_bblock.locator_list_entry.range_start, b2, s.range_start+b1]
238 #                 tmp_segs[i] = [s[LOCATOR], b1, s.range_start]
239 #                 tmp_segs.insert(i+1, bb_seg)
240 #                 bufferblock_segs.append(bb_seg)
241 #                 i += 1
242 #             elif s[LOCATOR] >= self.current_bblock.locator_list_entry.range_start:
243 #                 # The segment's data is in the buffer block.
244 #                 bufferblock_segs.append(s)
245 #             i += 1
246
247 #         # Now sum up the segments to get the total bytes
248 #         # of the file referencing into the buffer block.
249 #         write_total = sum([s.range_size for s in bufferblock_segs])
250
251 #         if write_total < self.current_bblock.locator_list_entry.range_size:
252 #             # There is more data in the buffer block than is actually accounted for by segments, so
253 #             # re-pack into a new buffer by copying over to a new buffer block.
254 #             new_bb = BufferBlock(self.current_bblock.locator,
255 #                                  self.current_bblock.locator_list_entry.range_start,
256 #                                  starting_size=write_total)
257 #             for t in bufferblock_segs:
258 #                 t_start = t[LOCATOR] - self.current_bblock.locator_list_entry.range_start
259 #                 t_end = t_start + t.range_size
260 #                 t[0] = self.current_bblock.locator_list_entry.range_start + new_bb.write_pointer
261 #                 new_bb.append(self.current_bblock.buffer_block[t_start:t_end])
262
263 #             self.current_bblock = new_bb
264 #             self.bufferblocks[self.current_bblock.locator] = self.current_bblock
265 #             self._data_locators[-1] = self.current_bblock.locator_list_entry
266 #             self._files.values()[0].segments = tmp_segs
267
268 #     def _commit(self):
269 #         # commit buffer block
270
271 #         # TODO: do 'put' in the background?
272 #         pdh = self._keep.put(self.current_bblock.buffer_block[0:self.current_bblock.write_pointer])
273 #         self._data_locators[-1][0] = pdh
274 #         self.current_bblock = None
275
276 #     def commit(self):
277 #         with self.mutex:
278 #             self._repack_writes()
279 #             self._commit()
280
281 #     def _append(self, data):
282 #         if len(data) > config.KEEP_BLOCK_SIZE:
283 #             raise ArgumentError("Please append data chunks smaller than config.KEEP_BLOCK_SIZE")
284
285 #         if self.current_bblock is None:
286 #             self._init_bufferblock()
287
288 #         if (self.current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
289 #             self._repack_writes()
290 #             if (self.current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
291 #                 self._commit()
292 #                 self._init_bufferblock()
293
294 #         self.current_bblock.append(data)
295
296 #     def append(self, data):
297 #         with self.mutex:
298 #             self._append(data)