Merge branch 'master' into 3112-report-bug
[arvados.git] / sdk / python / arvados / stream.py
1 import gflags
2 import httplib
3 import httplib2
4 import os
5 import pprint
6 import sys
7 import types
8 import subprocess
9 import json
10 import UserDict
11 import re
12 import hashlib
13 import string
14 import bz2
15 import zlib
16 import fcntl
17 import time
18 import threading
19 import collections
20
21 from keep import *
22 import config
23 import errors
24
25 LOCATOR = 0
26 BLOCKSIZE = 1
27 OFFSET = 2
28 SEGMENTSIZE = 3
29
30 def locators_and_ranges(data_locators, range_start, range_size, debug=False):
31     '''
32     Get blocks that are covered by the range
33     data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
34     range_start: start of range
35     range_size: size of range
36     returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
37     '''
38     if range_size == 0:
39         return []
40     resp = []
41     range_start = long(range_start)
42     range_size = long(range_size)
43     range_end = range_start + range_size
44     block_start = 0L
45
46     # range_start/block_start is the inclusive lower bound
47     # range_end/block_end is the exclusive upper bound
48
49     hi = len(data_locators)
50     lo = 0
51     i = int((hi + lo) / 2)
52     block_size = data_locators[i][BLOCKSIZE]
53     block_start = data_locators[i][OFFSET]
54     block_end = block_start + block_size
55     if debug: print '---'
56
57     # perform a binary search for the first block
58     # assumes that all of the blocks are contigious, so range_start is guaranteed
59     # to either fall into the range of a block or be outside the block range entirely
60     while not (range_start >= block_start and range_start < block_end):
61         if lo == i:
62             # must be out of range, fail
63             return []
64         if range_start > block_start:
65             lo = i
66         else:
67             hi = i
68         i = int((hi + lo) / 2)
69         if debug: print lo, i, hi
70         block_size = data_locators[i][BLOCKSIZE]
71         block_start = data_locators[i][OFFSET]
72         block_end = block_start + block_size
73
74     while i < len(data_locators):
75         locator, block_size, block_start = data_locators[i]
76         block_end = block_start + block_size
77         if debug:
78             print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
79         if range_end <= block_start:
80             # range ends before this block starts, so don't look at any more locators
81             break
82
83         #if range_start >= block_end:
84             # range starts after this block ends, so go to next block
85             # we should always start at the first block due to the binary above, so this test is redundant
86             #next
87
88         if range_start >= block_start and range_end <= block_end:
89             # range starts and ends in this block
90             resp.append([locator, block_size, range_start - block_start, range_size])
91         elif range_start >= block_start and range_end > block_end:
92             # range starts in this block
93             resp.append([locator, block_size, range_start - block_start, block_end - range_start])
94         elif range_start < block_start and range_end > block_end:
95             # range starts in a previous block and extends to further blocks
96             resp.append([locator, block_size, 0L, block_size])
97         elif range_start < block_start and range_end <= block_end:
98             # range starts in a previous block and ends in this block
99             resp.append([locator, block_size, 0L, range_end - block_start])
100         block_start = block_end
101         i += 1
102     return resp
103
104
105 class StreamFileReader(object):
106     def __init__(self, stream, segments, name):
107         self._stream = stream
108         self.segments = segments
109         self._name = name
110         self._filepos = 0L
111
112     def name(self):
113         return self._name
114
115     def decompressed_name(self):
116         return re.sub('\.(bz2|gz)$', '', self._name)
117
118     def stream_name(self):
119         return self._stream.name()
120
121     def seek(self, pos):
122         self._filepos = min(max(pos, 0L), self.size())
123
124     def tell(self):
125         return self._filepos
126
127     def size(self):
128         n = self.segments[-1]
129         return n[OFFSET] + n[BLOCKSIZE]
130
131     def read(self, size):
132         """Read up to 'size' bytes from the stream, starting at the current file position"""
133         if size == 0:
134             return ''
135
136         data = ''
137         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, self._filepos, size):
138             data += self._stream.readfrom(locator+segmentoffset, segmentsize)
139         self._filepos += len(data)
140         return data
141
142     def readfrom(self, start, size):
143         """Read up to 'size' bytes from the stream, starting at 'start'"""
144         if size == 0:
145             return ''
146
147         data = ''
148         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
149             data += self._stream.readfrom(locator+segmentoffset, segmentsize)
150         return data
151
152     def readall(self, size=2**20):
153         while True:
154             data = self.read(size)
155             if data == '':
156                 break
157             yield data
158
159     def decompress(self, decompress, size):
160         for segment in self.readall(size):
161             data = decompress(segment)
162             if data and data != '':
163                 yield data
164
165     def readall_decompressed(self, size=2**20):
166         self.seek(0)
167         if re.search('\.bz2$', self._name):
168             dc = bz2.BZ2Decompressor()
169             return self.decompress(lambda segment: dc.decompress(segment), size)
170         elif re.search('\.gz$', self._name):
171             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
172             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
173         else:
174             return self.readall(size)
175
176     def readlines(self, decompress=True):
177         if decompress:
178             datasource = self.readall_decompressed()
179         else:
180             datasource = self.readall()
181         data = ''
182         for newdata in datasource:
183             data += newdata
184             sol = 0
185             while True:
186                 eol = string.find(data, "\n", sol)
187                 if eol < 0:
188                     break
189                 yield data[sol:eol+1]
190                 sol = eol+1
191             data = data[sol:]
192         if data != '':
193             yield data
194
195     def as_manifest(self):
196         manifest_text = ['.']
197         manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
198         manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
199         return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
200
201 class StreamReader(object):
202     def __init__(self, tokens, keep=None, debug=False, _empty=False):
203         self._stream_name = None
204         self._data_locators = []
205         self._files = collections.OrderedDict()
206
207         if keep != None:
208             self._keep = keep
209         else:
210             self._keep = Keep.global_client_object()
211
212         streamoffset = 0L
213
214         # parse stream
215         for tok in tokens:
216             if debug: print 'tok', tok
217             if self._stream_name == None:
218                 self._stream_name = tok.replace('\\040', ' ')
219                 continue
220
221             s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
222             if s:
223                 blocksize = long(s.group(1))
224                 self._data_locators.append([tok, blocksize, streamoffset])
225                 streamoffset += blocksize
226                 continue
227
228             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
229             if s:
230                 pos = long(s.group(1))
231                 size = long(s.group(2))
232                 name = s.group(3).replace('\\040', ' ')
233                 if name not in self._files:
234                     self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
235                 else:
236                     n = self._files[name]
237                     n.segments.append([pos, size, n.size()])
238                 continue
239
240             raise errors.SyntaxError("Invalid manifest format")
241
242     def name(self):
243         return self._stream_name
244
245     def files(self):
246         return self._files
247
248     def all_files(self):
249         return self._files.values()
250
251     def size(self):
252         n = self._data_locators[-1]
253         return n[OFFSET] + n[BLOCKSIZE]
254
255     def locators_and_ranges(self, range_start, range_size):
256         return locators_and_ranges(self._data_locators, range_start, range_size)
257
258     def readfrom(self, start, size):
259         """Read up to 'size' bytes from the stream, starting at 'start'"""
260         if size == 0:
261             return ''
262         data = ''
263         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
264             data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
265         return data
266
267     def manifest_text(self, strip=False):
268         manifest_text = [self.name().replace(' ', '\\040')]
269         if strip:
270             for d in self._data_locators:
271                 m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR])
272                 manifest_text.append(m.group(0))
273         else:
274             manifest_text.extend([d[LOCATOR] for d in self._data_locators])
275         manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
276                                         for seg in f.segments])
277                               for f in self._files.values()])
278         return ' '.join(manifest_text) + '\n'