Merge branch 'master' into 4194-keep-logging
[arvados.git] / sdk / python / arvados / stream.py
1 import bz2
2 import collections
3 import hashlib
4 import os
5 import re
6 import zlib
7
8 from .arvfile import ArvadosFileBase
9 from arvados.retry import retry_method
10 from keep import *
11 import config
12 import errors
13
14 LOCATOR = 0
15 BLOCKSIZE = 1
16 OFFSET = 2
17 SEGMENTSIZE = 3
18
19 def locators_and_ranges(data_locators, range_start, range_size, debug=False):
20     '''
21     Get blocks that are covered by the range
22     data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
23     range_start: start of range
24     range_size: size of range
25     returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
26     '''
27     if range_size == 0:
28         return []
29     resp = []
30     range_start = long(range_start)
31     range_size = long(range_size)
32     range_end = range_start + range_size
33     block_start = 0L
34
35     # range_start/block_start is the inclusive lower bound
36     # range_end/block_end is the exclusive upper bound
37
38     hi = len(data_locators)
39     lo = 0
40     i = int((hi + lo) / 2)
41     block_size = data_locators[i][BLOCKSIZE]
42     block_start = data_locators[i][OFFSET]
43     block_end = block_start + block_size
44     if debug: print '---'
45
46     # perform a binary search for the first block
47     # assumes that all of the blocks are contigious, so range_start is guaranteed
48     # to either fall into the range of a block or be outside the block range entirely
49     while not (range_start >= block_start and range_start < block_end):
50         if lo == i:
51             # must be out of range, fail
52             return []
53         if range_start > block_start:
54             lo = i
55         else:
56             hi = i
57         i = int((hi + lo) / 2)
58         if debug: print lo, i, hi
59         block_size = data_locators[i][BLOCKSIZE]
60         block_start = data_locators[i][OFFSET]
61         block_end = block_start + block_size
62
63     while i < len(data_locators):
64         locator, block_size, block_start = data_locators[i]
65         block_end = block_start + block_size
66         if debug:
67             print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
68         if range_end <= block_start:
69             # range ends before this block starts, so don't look at any more locators
70             break
71
72         #if range_start >= block_end:
73             # range starts after this block ends, so go to next block
74             # we should always start at the first block due to the binary above, so this test is redundant
75             #next
76
77         if range_start >= block_start and range_end <= block_end:
78             # range starts and ends in this block
79             resp.append([locator, block_size, range_start - block_start, range_size])
80         elif range_start >= block_start and range_end > block_end:
81             # range starts in this block
82             resp.append([locator, block_size, range_start - block_start, block_end - range_start])
83         elif range_start < block_start and range_end > block_end:
84             # range starts in a previous block and extends to further blocks
85             resp.append([locator, block_size, 0L, block_size])
86         elif range_start < block_start and range_end <= block_end:
87             # range starts in a previous block and ends in this block
88             resp.append([locator, block_size, 0L, range_end - block_start])
89         block_start = block_end
90         i += 1
91     return resp
92
93 def split(path):
94     """split(path) -> streamname, filename
95
96     Separate the stream name and file name in a /-separated stream path.
97     If no stream name is available, assume '.'.
98     """
99     try:
100         stream_name, file_name = path.rsplit('/', 1)
101     except ValueError:  # No / in string
102         stream_name, file_name = '.', path
103     return stream_name, file_name
104
105 class StreamFileReader(ArvadosFileBase):
106     class _NameAttribute(str):
107         # The Python file API provides a plain .name attribute.
108         # Older SDK provided a name() method.
109         # This class provides both, for maximum compatibility.
110         def __call__(self):
111             return self
112
113
114     def __init__(self, stream, segments, name):
115         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb')
116         self._stream = stream
117         self.segments = segments
118         self._filepos = 0L
119         self.num_retries = stream.num_retries
120         self._readline_cache = (None, None)
121
122     def __iter__(self):
123         while True:
124             data = self.readline()
125             if not data:
126                 break
127             yield data
128
129     def decompressed_name(self):
130         return re.sub('\.(bz2|gz)$', '', self.name)
131
132     def stream_name(self):
133         return self._stream.name()
134
135     @ArvadosFileBase._before_close
136     def seek(self, pos, whence=os.SEEK_CUR):
137         if whence == os.SEEK_CUR:
138             pos += self._filepos
139         elif whence == os.SEEK_END:
140             pos += self.size()
141         self._filepos = min(max(pos, 0L), self.size())
142
143     def tell(self):
144         return self._filepos
145
146     def size(self):
147         n = self.segments[-1]
148         return n[OFFSET] + n[BLOCKSIZE]
149
150     @ArvadosFileBase._before_close
151     @retry_method
152     def read(self, size, num_retries=None):
153         """Read up to 'size' bytes from the stream, starting at the current file position"""
154         if size == 0:
155             return ''
156
157         data = ''
158         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
159         if available_chunks:
160             locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
161             data = self._stream.readfrom(locator+segmentoffset, segmentsize,
162                                          num_retries=num_retries)
163
164         self._filepos += len(data)
165         return data
166
167     @ArvadosFileBase._before_close
168     @retry_method
169     def readfrom(self, start, size, num_retries=None):
170         """Read up to 'size' bytes from the stream, starting at 'start'"""
171         if size == 0:
172             return ''
173
174         data = []
175         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
176             data.append(self._stream.readfrom(locator+segmentoffset, segmentsize,
177                                               num_retries=num_retries))
178         return ''.join(data)
179
180     @ArvadosFileBase._before_close
181     @retry_method
182     def readall(self, size=2**20, num_retries=None):
183         while True:
184             data = self.read(size, num_retries=num_retries)
185             if data == '':
186                 break
187             yield data
188
189     @ArvadosFileBase._before_close
190     @retry_method
191     def readline(self, size=float('inf'), num_retries=None):
192         cache_pos, cache_data = self._readline_cache
193         if self.tell() == cache_pos:
194             data = [cache_data]
195         else:
196             data = ['']
197         data_size = len(data[-1])
198         while (data_size < size) and ('\n' not in data[-1]):
199             next_read = self.read(2 ** 20, num_retries=num_retries)
200             if not next_read:
201                 break
202             data.append(next_read)
203             data_size += len(next_read)
204         data = ''.join(data)
205         try:
206             nextline_index = data.index('\n') + 1
207         except ValueError:
208             nextline_index = len(data)
209         nextline_index = min(nextline_index, size)
210         self._readline_cache = (self.tell(), data[nextline_index:])
211         return data[:nextline_index]
212
213     @ArvadosFileBase._before_close
214     @retry_method
215     def decompress(self, decompress, size, num_retries=None):
216         for segment in self.readall(size, num_retries):
217             data = decompress(segment)
218             if data:
219                 yield data
220
221     @ArvadosFileBase._before_close
222     @retry_method
223     def readall_decompressed(self, size=2**20, num_retries=None):
224         self.seek(0)
225         if self.name.endswith('.bz2'):
226             dc = bz2.BZ2Decompressor()
227             return self.decompress(dc.decompress, size,
228                                    num_retries=num_retries)
229         elif self.name.endswith('.gz'):
230             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
231             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
232                                    size, num_retries=num_retries)
233         else:
234             return self.readall(size, num_retries=num_retries)
235
236     @ArvadosFileBase._before_close
237     @retry_method
238     def readlines(self, sizehint=float('inf'), num_retries=None):
239         data = []
240         data_size = 0
241         for s in self.readall(num_retries=num_retries):
242             data.append(s)
243             data_size += len(s)
244             if data_size >= sizehint:
245                 break
246         return ''.join(data).splitlines(True)
247
248     def as_manifest(self):
249         manifest_text = ['.']
250         manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
251         manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
252         return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text(normalize=True)
253
254
255 class StreamReader(object):
256     def __init__(self, tokens, keep=None, debug=False, _empty=False,
257                  num_retries=0):
258         self._stream_name = None
259         self._data_locators = []
260         self._files = collections.OrderedDict()
261         self._keep = keep
262         self.num_retries = num_retries
263
264         streamoffset = 0L
265
266         # parse stream
267         for tok in tokens:
268             if debug: print 'tok', tok
269             if self._stream_name is None:
270                 self._stream_name = tok.replace('\\040', ' ')
271                 continue
272
273             s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
274             if s:
275                 blocksize = long(s.group(1))
276                 self._data_locators.append([tok, blocksize, streamoffset])
277                 streamoffset += blocksize
278                 continue
279
280             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
281             if s:
282                 pos = long(s.group(1))
283                 size = long(s.group(2))
284                 name = s.group(3).replace('\\040', ' ')
285                 if name not in self._files:
286                     self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
287                 else:
288                     n = self._files[name]
289                     n.segments.append([pos, size, n.size()])
290                 continue
291
292             raise errors.SyntaxError("Invalid manifest format")
293
294     def name(self):
295         return self._stream_name
296
297     def files(self):
298         return self._files
299
300     def all_files(self):
301         return self._files.values()
302
303     def size(self):
304         n = self._data_locators[-1]
305         return n[OFFSET] + n[BLOCKSIZE]
306
307     def locators_and_ranges(self, range_start, range_size):
308         return locators_and_ranges(self._data_locators, range_start, range_size)
309
310     @retry_method
311     def readfrom(self, start, size, num_retries=None):
312         """Read up to 'size' bytes from the stream, starting at 'start'"""
313         if size == 0:
314             return ''
315         if self._keep is None:
316             self._keep = KeepClient(num_retries=self.num_retries)
317         data = []
318         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
319             data.append(self._keep.get(locator, num_retries=num_retries)[segmentoffset:segmentoffset+segmentsize])
320         return ''.join(data)
321
322     def manifest_text(self, strip=False):
323         manifest_text = [self.name().replace(' ', '\\040')]
324         if strip:
325             for d in self._data_locators:
326                 m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR])
327                 manifest_text.append(m.group(0))
328         else:
329             manifest_text.extend([d[LOCATOR] for d in self._data_locators])
330         manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
331                                         for seg in f.segments])
332                               for f in self._files.values()])
333         return ' '.join(manifest_text) + '\n'