9b90cea2e07718ed6ef477cf6cca25c1acf64add
[arvados.git] / sdk / python / arvados / stream.py
1 import gflags
2 import os
3 import pprint
4 import sys
5 import types
6 import subprocess
7 import json
8 import UserDict
9 import re
10 import hashlib
11 import string
12 import bz2
13 import zlib
14 import fcntl
15 import time
16 import threading
17 import collections
18
19 from arvados.retry import retry_method
20 from keep import *
21 import config
22 import errors
23
24 LOCATOR = 0
25 BLOCKSIZE = 1
26 OFFSET = 2
27 SEGMENTSIZE = 3
28
29 def locators_and_ranges(data_locators, range_start, range_size, debug=False):
30     '''
31     Get blocks that are covered by the range
32     data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
33     range_start: start of range
34     range_size: size of range
35     returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
36     '''
37     if range_size == 0:
38         return []
39     resp = []
40     range_start = long(range_start)
41     range_size = long(range_size)
42     range_end = range_start + range_size
43     block_start = 0L
44
45     # range_start/block_start is the inclusive lower bound
46     # range_end/block_end is the exclusive upper bound
47
48     hi = len(data_locators)
49     lo = 0
50     i = int((hi + lo) / 2)
51     block_size = data_locators[i][BLOCKSIZE]
52     block_start = data_locators[i][OFFSET]
53     block_end = block_start + block_size
54     if debug: print '---'
55
56     # perform a binary search for the first block
57     # assumes that all of the blocks are contigious, so range_start is guaranteed
58     # to either fall into the range of a block or be outside the block range entirely
59     while not (range_start >= block_start and range_start < block_end):
60         if lo == i:
61             # must be out of range, fail
62             return []
63         if range_start > block_start:
64             lo = i
65         else:
66             hi = i
67         i = int((hi + lo) / 2)
68         if debug: print lo, i, hi
69         block_size = data_locators[i][BLOCKSIZE]
70         block_start = data_locators[i][OFFSET]
71         block_end = block_start + block_size
72
73     while i < len(data_locators):
74         locator, block_size, block_start = data_locators[i]
75         block_end = block_start + block_size
76         if debug:
77             print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
78         if range_end <= block_start:
79             # range ends before this block starts, so don't look at any more locators
80             break
81
82         #if range_start >= block_end:
83             # range starts after this block ends, so go to next block
84             # we should always start at the first block due to the binary above, so this test is redundant
85             #next
86
87         if range_start >= block_start and range_end <= block_end:
88             # range starts and ends in this block
89             resp.append([locator, block_size, range_start - block_start, range_size])
90         elif range_start >= block_start and range_end > block_end:
91             # range starts in this block
92             resp.append([locator, block_size, range_start - block_start, block_end - range_start])
93         elif range_start < block_start and range_end > block_end:
94             # range starts in a previous block and extends to further blocks
95             resp.append([locator, block_size, 0L, block_size])
96         elif range_start < block_start and range_end <= block_end:
97             # range starts in a previous block and ends in this block
98             resp.append([locator, block_size, 0L, range_end - block_start])
99         block_start = block_end
100         i += 1
101     return resp
102
103
104 class StreamFileReader(object):
105     def __init__(self, stream, segments, name):
106         self._stream = stream
107         self.segments = segments
108         self._name = name
109         self._filepos = 0L
110         self.num_retries = stream.num_retries
111
112     def name(self):
113         return self._name
114
115     def decompressed_name(self):
116         return re.sub('\.(bz2|gz)$', '', self._name)
117
118     def stream_name(self):
119         return self._stream.name()
120
121     def seek(self, pos):
122         self._filepos = min(max(pos, 0L), self.size())
123
124     def tell(self):
125         return self._filepos
126
127     def size(self):
128         n = self.segments[-1]
129         return n[OFFSET] + n[BLOCKSIZE]
130
131     @retry_method
132     def read(self, size, num_retries=None):
133         """Read up to 'size' bytes from the stream, starting at the current file position"""
134         if size == 0:
135             return ''
136
137         data = ''
138         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
139         if available_chunks:
140             locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
141             data = self._stream.readfrom(locator+segmentoffset, segmentsize,
142                                          num_retries=num_retries)
143
144         self._filepos += len(data)
145         return data
146
147     @retry_method
148     def readfrom(self, start, size, num_retries=None):
149         """Read up to 'size' bytes from the stream, starting at 'start'"""
150         if size == 0:
151             return ''
152
153         data = []
154         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
155             data.append(self._stream.readfrom(locator+segmentoffset, segmentsize,
156                                               num_retries=num_retries))
157         return ''.join(data)
158
159     @retry_method
160     def readall(self, size=2**20, num_retries=None):
161         while True:
162             data = self.read(size, num_retries=num_retries)
163             if data == '':
164                 break
165             yield data
166
167     @retry_method
168     def decompress(self, decompress, size, num_retries=None):
169         for segment in self.readall(size, num_retries):
170             data = decompress(segment)
171             if data and data != '':
172                 yield data
173
174     @retry_method
175     def readall_decompressed(self, size=2**20, num_retries=None):
176         self.seek(0)
177         if re.search('\.bz2$', self._name):
178             dc = bz2.BZ2Decompressor()
179             return self.decompress(dc.decompress, size,
180                                    num_retries=num_retries)
181         elif re.search('\.gz$', self._name):
182             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
183             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
184                                    size, num_retries=num_retries)
185         else:
186             return self.readall(size, num_retries=num_retries)
187
188     @retry_method
189     def readlines(self, decompress=True, num_retries=None):
190         read_func = self.readall_decompressed if decompress else self.readall
191         data = ''
192         for newdata in read_func(num_retries=num_retries):
193             data += newdata
194             sol = 0
195             while True:
196                 eol = string.find(data, "\n", sol)
197                 if eol < 0:
198                     break
199                 yield data[sol:eol+1]
200                 sol = eol+1
201             data = data[sol:]
202         if data != '':
203             yield data
204
205     def as_manifest(self):
206         manifest_text = ['.']
207         manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
208         manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
209         return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text(normalize=True)
210
211
212 class StreamReader(object):
213     def __init__(self, tokens, keep=None, debug=False, _empty=False,
214                  num_retries=0):
215         self._stream_name = None
216         self._data_locators = []
217         self._files = collections.OrderedDict()
218         self._keep = keep
219         self.num_retries = num_retries
220
221         streamoffset = 0L
222
223         # parse stream
224         for tok in tokens:
225             if debug: print 'tok', tok
226             if self._stream_name is None:
227                 self._stream_name = tok.replace('\\040', ' ')
228                 continue
229
230             s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
231             if s:
232                 blocksize = long(s.group(1))
233                 self._data_locators.append([tok, blocksize, streamoffset])
234                 streamoffset += blocksize
235                 continue
236
237             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
238             if s:
239                 pos = long(s.group(1))
240                 size = long(s.group(2))
241                 name = s.group(3).replace('\\040', ' ')
242                 if name not in self._files:
243                     self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
244                 else:
245                     n = self._files[name]
246                     n.segments.append([pos, size, n.size()])
247                 continue
248
249             raise errors.SyntaxError("Invalid manifest format")
250
251     def name(self):
252         return self._stream_name
253
254     def files(self):
255         return self._files
256
257     def all_files(self):
258         return self._files.values()
259
260     def size(self):
261         n = self._data_locators[-1]
262         return n[OFFSET] + n[BLOCKSIZE]
263
264     def locators_and_ranges(self, range_start, range_size):
265         return locators_and_ranges(self._data_locators, range_start, range_size)
266
267     @retry_method
268     def readfrom(self, start, size, num_retries=None):
269         """Read up to 'size' bytes from the stream, starting at 'start'"""
270         if size == 0:
271             return ''
272         if self._keep is None:
273             self._keep = KeepClient(num_retries=self.num_retries)
274         data = []
275         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
276             data.append(self._keep.get(locator, num_retries=num_retries)[segmentoffset:segmentoffset+segmentsize])
277         return ''.join(data)
278
279     def manifest_text(self, strip=False):
280         manifest_text = [self.name().replace(' ', '\\040')]
281         if strip:
282             for d in self._data_locators:
283                 m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR])
284                 manifest_text.append(m.group(0))
285         else:
286             manifest_text.extend([d[LOCATOR] for d in self._data_locators])
287         manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
288                                         for seg in f.segments])
289                               for f in self._files.values()])
290         return ' '.join(manifest_text) + '\n'