Merge branch 'master' into 4823-python-sdk-writable-collection-api
[arvados.git] / sdk / python / arvados / stream.py
1 import collections
2 import hashlib
3 import os
4 import re
5 import threading
6 import functools
7 import copy
8
9 from .ranges import *
10 from .arvfile import ArvadosFileBase, StreamFileReader
11 from arvados.retry import retry_method
12 from keep import *
13 import config
14 import errors
15
16 def locator_block_size(loc):
17     s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', loc)
18     return long(s.group(1))
19
20 def normalize_stream(s, stream):
21     '''
22     s is the stream name
23     stream is a dict mapping each filename to a list in the form [block locator, block size, segment offset (from beginning of block), segment size]
24     returns the stream as a list of tokens
25     '''
26     stream_tokens = [s]
27     sortedfiles = list(stream.keys())
28     sortedfiles.sort()
29
30     blocks = {}
31     streamoffset = 0L
32     # Go through each file and add each referenced block exactly once.
33     for f in sortedfiles:
34         for b in stream[f]:
35             if b.locator not in blocks:
36                 stream_tokens.append(b.locator)
37                 blocks[b.locator] = streamoffset
38                 streamoffset += locator_block_size(b.locator)
39
40     # Add the empty block if the stream is otherwise empty.
41     if len(stream_tokens) == 1:
42         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
43
44     for f in sortedfiles:
45         # Add in file segments
46         current_span = None
47         fout = f.replace(' ', '\\040')
48         for segment in stream[f]:
49             # Collapse adjacent segments
50             streamoffset = blocks[segment.locator] + segment.segment_offset
51             if current_span is None:
52                 current_span = [streamoffset, streamoffset + segment.segment_size]
53             else:
54                 if streamoffset == current_span[1]:
55                     current_span[1] += segment.segment_size
56                 else:
57                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
58                     current_span = [streamoffset, streamoffset + segment.segment_size]
59
60         if current_span is not None:
61             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
62
63         if not stream[f]:
64             stream_tokens.append("0:0:{0}".format(fout))
65
66     return stream_tokens
67
68
69 class StreamReader(object):
70     def __init__(self, tokens, keep=None, debug=False, _empty=False,
71                  num_retries=0):
72         self._stream_name = None
73         self._data_locators = []
74         self._files = collections.OrderedDict()
75         self._keep = keep
76         self.num_retries = num_retries
77
78         streamoffset = 0L
79
80         # parse stream
81         for tok in tokens:
82             if debug: print 'tok', tok
83             if self._stream_name is None:
84                 self._stream_name = tok.replace('\\040', ' ')
85                 continue
86
87             s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
88             if s:
89                 blocksize = long(s.group(1))
90                 self._data_locators.append(Range(tok, streamoffset, blocksize))
91                 streamoffset += blocksize
92                 continue
93
94             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
95             if s:
96                 pos = long(s.group(1))
97                 size = long(s.group(2))
98                 name = s.group(3).replace('\\040', ' ')
99                 if name not in self._files:
100                     self._files[name] = StreamFileReader(self, [Range(pos, 0, size)], name)
101                 else:
102                     filereader = self._files[name]
103                     filereader.segments.append(Range(pos, filereader.size(), size))
104                 continue
105
106             raise errors.SyntaxError("Invalid manifest format")
107
108     def name(self):
109         return self._stream_name
110
111     def files(self):
112         return self._files
113
114     def all_files(self):
115         return self._files.values()
116
117     def _size(self):
118         n = self._data_locators[-1]
119         return n.range_start + n.range_size
120
121     def size(self):
122         return self._size()
123
124     def locators_and_ranges(self, range_start, range_size):
125         return locators_and_ranges(self._data_locators, range_start, range_size)
126
127     @retry_method
128     def _keepget(self, locator, num_retries=None):
129         return self._keep.get(locator, num_retries=num_retries)
130
131     @retry_method
132     def readfrom(self, start, size, num_retries=None):
133         return self._readfrom(start, size, num_retries=num_retries)
134
135     @retry_method
136     def _readfrom(self, start, size, num_retries=None):
137         """Read up to 'size' bytes from the stream, starting at 'start'"""
138         if size == 0:
139             return ''
140         if self._keep is None:
141             self._keep = KeepClient(num_retries=self.num_retries)
142         data = []
143         for lr in locators_and_ranges(self._data_locators, start, size):
144             data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
145         return ''.join(data)
146
147     def manifest_text(self, strip=False):
148         manifest_text = [self.name().replace(' ', '\\040')]
149         if strip:
150             for d in self._data_locators:
151                 m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator)
152                 manifest_text.append(m.group(0))
153         else:
154             manifest_text.extend([d.locator for d in self._data_locators])
155         manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040'))
156                                         for seg in f.segments])
157                               for f in self._files.values()])
158         return ' '.join(manifest_text) + '\n'
159
160
161
162
163 # class StreamWriter(StreamReader):
164 #     def __init__(self, tokens, keep=None, debug=False, _empty=False,
165 #                  num_retries=0):
166 #         super(StreamWriter, self).__init__(tokens, keep, debug, _empty, num_retries)
167
168 #         if len(self._files) != 1:
169 #             raise AssertionError("StreamWriter can only have one file at a time")
170 #         sr = self._files.popitem()[1]
171 #         self._files[sr.name] = StreamFileWriter(self, sr.segments, sr.name)
172
173 #         self.mutex = threading.Lock()
174 #         self.current_bblock = None
175 #         self.bufferblocks = {}
176
177 #     # wrap superclass methods in mutex
178 #     def _proxy_method(name):
179 #         method = getattr(StreamReader, name)
180 #         @functools.wraps(method, ('__name__', '__doc__'))
181 #         def wrapper(self, *args, **kwargs):
182 #             with self.mutex:
183 #                 return method(self, *args, **kwargs)
184 #         return wrapper
185
186 #     for _method_name in ['files', 'all_files', 'size', 'locators_and_ranges', 'readfrom', 'manifest_text']:
187 #         locals()[_method_name] = _proxy_method(_method_name)
188
189 #     @retry_method
190 #     def _keepget(self, locator, num_retries=None):
191 #         if locator in self.bufferblocks:
192 #             bb = self.bufferblocks[locator]
193 #             return str(bb.buffer_block[0:bb.write_pointer])
194 #         else:
195 #             return self._keep.get(locator, num_retries=num_retries)
196
197 #     def _init_bufferblock(self):
198 #         last = self._data_locators[-1]
199 #         streamoffset = last.range_start + last.range_size
200 #         if last.range_size == 0:
201 #             del self._data_locators[-1]
202 #         self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset)
203 #         self.bufferblocks[self.current_bblock.locator] = self.current_bblock
204 #         self._data_locators.append(self.current_bblock.locator_list_entry)
205
206 #     def _repack_writes(self):
207 #         '''Test if the buffer block has more data than is referenced by actual segments
208 #         (this happens when a buffered write over-writes a file range written in
209 #         a previous buffered write).  Re-pack the buffer block for efficiency
210 #         and to avoid leaking information.
211 #         '''
212 #         segs = self._files.values()[0].segments
213
214 #         bufferblock_segs = []
215 #         i = 0
216 #         tmp_segs = copy.copy(segs)
217 #         while i < len(tmp_segs):
218 #             # Go through each segment and identify segments that include the buffer block
219 #             s = tmp_segs[i]
220 #             if s[LOCATOR] < self.current_bblock.locator_list_entry.range_start and (s[LOCATOR] + s.range_size) > self.current_bblock.locator_list_entry.range_start:
221 #                 # The segment straddles the previous block and the current buffer block.  Split the segment.
222 #                 b1 = self.current_bblock.locator_list_entry.range_start - s[LOCATOR]
223 #                 b2 = (s[LOCATOR] + s.range_size) - self.current_bblock.locator_list_entry.range_start
224 #                 bb_seg = [self.current_bblock.locator_list_entry.range_start, b2, s.range_start+b1]
225 #                 tmp_segs[i] = [s[LOCATOR], b1, s.range_start]
226 #                 tmp_segs.insert(i+1, bb_seg)
227 #                 bufferblock_segs.append(bb_seg)
228 #                 i += 1
229 #             elif s[LOCATOR] >= self.current_bblock.locator_list_entry.range_start:
230 #                 # The segment's data is in the buffer block.
231 #                 bufferblock_segs.append(s)
232 #             i += 1
233
234 #         # Now sum up the segments to get the total bytes
235 #         # of the file referencing into the buffer block.
236 #         write_total = sum([s.range_size for s in bufferblock_segs])
237
238 #         if write_total < self.current_bblock.locator_list_entry.range_size:
239 #             # There is more data in the buffer block than is actually accounted for by segments, so
240 #             # re-pack into a new buffer by copying over to a new buffer block.
241 #             new_bb = BufferBlock(self.current_bblock.locator,
242 #                                  self.current_bblock.locator_list_entry.range_start,
243 #                                  starting_size=write_total)
244 #             for t in bufferblock_segs:
245 #                 t_start = t[LOCATOR] - self.current_bblock.locator_list_entry.range_start
246 #                 t_end = t_start + t.range_size
247 #                 t[0] = self.current_bblock.locator_list_entry.range_start + new_bb.write_pointer
248 #                 new_bb.append(self.current_bblock.buffer_block[t_start:t_end])
249
250 #             self.current_bblock = new_bb
251 #             self.bufferblocks[self.current_bblock.locator] = self.current_bblock
252 #             self._data_locators[-1] = self.current_bblock.locator_list_entry
253 #             self._files.values()[0].segments = tmp_segs
254
255 #     def _commit(self):
256 #         # commit buffer block
257
258 #         # TODO: do 'put' in the background?
259 #         pdh = self._keep.put(self.current_bblock.buffer_block[0:self.current_bblock.write_pointer])
260 #         self._data_locators[-1][0] = pdh
261 #         self.current_bblock = None
262
263 #     def commit(self):
264 #         with self.mutex:
265 #             self._repack_writes()
266 #             self._commit()
267
268 #     def _append(self, data):
269 #         if len(data) > config.KEEP_BLOCK_SIZE:
270 #             raise ArgumentError("Please append data chunks smaller than config.KEEP_BLOCK_SIZE")
271
272 #         if self.current_bblock is None:
273 #             self._init_bufferblock()
274
275 #         if (self.current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
276 #             self._repack_writes()
277 #             if (self.current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
278 #                 self._commit()
279 #                 self._init_bufferblock()
280
281 #         self.current_bblock.append(data)
282
283 #     def append(self, data):
284 #         with self.mutex:
285 #             self._append(data)