Merge branch 'master' into 3296-user-profile
[arvados.git] / sdk / python / arvados / stream.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20 import collections
21
22 from keep import *
23 import config
24 import errors
25
26 LOCATOR = 0
27 BLOCKSIZE = 1
28 OFFSET = 2
29 SEGMENTSIZE = 3
30
31 def locators_and_ranges(data_locators, range_start, range_size, debug=False):
32     '''
33     Get blocks that are covered by the range
34     data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
35     range_start: start of range
36     range_size: size of range
37     returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
38     '''
39     if range_size == 0:
40         return []
41     resp = []
42     range_start = long(range_start)
43     range_size = long(range_size)
44     range_end = range_start + range_size
45     block_start = 0L
46
47     # range_start/block_start is the inclusive lower bound
48     # range_end/block_end is the exclusive upper bound
49
50     hi = len(data_locators)
51     lo = 0
52     i = int((hi + lo) / 2)
53     block_size = data_locators[i][BLOCKSIZE]
54     block_start = data_locators[i][OFFSET]
55     block_end = block_start + block_size
56     if debug: print '---'
57
58     # perform a binary search for the first block
59     # assumes that all of the blocks are contigious, so range_start is guaranteed
60     # to either fall into the range of a block or be outside the block range entirely
61     while not (range_start >= block_start and range_start < block_end):
62         if lo == i:
63             # must be out of range, fail
64             return []
65         if range_start > block_start:
66             lo = i
67         else:
68             hi = i
69         i = int((hi + lo) / 2)
70         if debug: print lo, i, hi
71         block_size = data_locators[i][BLOCKSIZE]
72         block_start = data_locators[i][OFFSET]
73         block_end = block_start + block_size
74
75     while i < len(data_locators):
76         locator, block_size, block_start = data_locators[i]
77         block_end = block_start + block_size
78         if debug:
79             print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
80         if range_end <= block_start:
81             # range ends before this block starts, so don't look at any more locators
82             break
83
84         #if range_start >= block_end:
85             # range starts after this block ends, so go to next block
86             # we should always start at the first block due to the binary above, so this test is redundant
87             #next
88
89         if range_start >= block_start and range_end <= block_end:
90             # range starts and ends in this block
91             resp.append([locator, block_size, range_start - block_start, range_size])
92         elif range_start >= block_start and range_end > block_end:
93             # range starts in this block
94             resp.append([locator, block_size, range_start - block_start, block_end - range_start])
95         elif range_start < block_start and range_end > block_end:
96             # range starts in a previous block and extends to further blocks
97             resp.append([locator, block_size, 0L, block_size])
98         elif range_start < block_start and range_end <= block_end:
99             # range starts in a previous block and ends in this block
100             resp.append([locator, block_size, 0L, range_end - block_start])
101         block_start = block_end
102         i += 1
103     return resp
104
105
106 class StreamFileReader(object):
107     def __init__(self, stream, segments, name):
108         self._stream = stream
109         self.segments = segments
110         self._name = name
111         self._filepos = 0L
112
113     def name(self):
114         return self._name
115
116     def decompressed_name(self):
117         return re.sub('\.(bz2|gz)$', '', self._name)
118
119     def stream_name(self):
120         return self._stream.name()
121
122     def seek(self, pos):
123         self._filepos = min(max(pos, 0L), self.size())
124
125     def tell(self):
126         return self._filepos
127
128     def size(self):
129         n = self.segments[-1]
130         return n[OFFSET] + n[BLOCKSIZE]
131
132     def read(self, size):
133         """Read up to 'size' bytes from the stream, starting at the current file position"""
134         if size == 0:
135             return ''
136
137         data = ''
138         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, self._filepos, size):
139             data += self._stream.readfrom(locator+segmentoffset, segmentsize)
140         self._filepos += len(data)
141         return data
142
143     def readfrom(self, start, size):
144         """Read up to 'size' bytes from the stream, starting at 'start'"""
145         if size == 0:
146             return ''
147
148         data = ''
149         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
150             data += self._stream.readfrom(locator+segmentoffset, segmentsize)
151         return data
152
153     def readall(self, size=2**20):
154         while True:
155             data = self.read(size)
156             if data == '':
157                 break
158             yield data
159
160     def decompress(self, decompress, size):
161         for segment in self.readall(size):
162             data = decompress(segment)
163             if data and data != '':
164                 yield data
165
166     def readall_decompressed(self, size=2**20):
167         self.seek(0)
168         if re.search('\.bz2$', self._name):
169             dc = bz2.BZ2Decompressor()
170             return self.decompress(lambda segment: dc.decompress(segment), size)
171         elif re.search('\.gz$', self._name):
172             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
173             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
174         else:
175             return self.readall(size)
176
177     def readlines(self, decompress=True):
178         if decompress:
179             datasource = self.readall_decompressed()
180         else:
181             datasource = self.readall()
182         data = ''
183         for newdata in datasource:
184             data += newdata
185             sol = 0
186             while True:
187                 eol = string.find(data, "\n", sol)
188                 if eol < 0:
189                     break
190                 yield data[sol:eol+1]
191                 sol = eol+1
192             data = data[sol:]
193         if data != '':
194             yield data
195
196     def as_manifest(self):
197         manifest_text = ['.']
198         manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
199         manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
200         return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
201
202 class StreamReader(object):
203     def __init__(self, tokens, keep=None, debug=False, _empty=False):
204         self._stream_name = None
205         self._data_locators = []
206         self._files = collections.OrderedDict()
207
208         if keep != None:
209             self._keep = keep
210         else:
211             self._keep = Keep.global_client_object()
212
213         streamoffset = 0L
214
215         # parse stream
216         for tok in tokens:
217             if debug: print 'tok', tok
218             if self._stream_name == None:
219                 self._stream_name = tok.replace('\\040', ' ')
220                 continue
221
222             s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
223             if s:
224                 blocksize = long(s.group(1))
225                 self._data_locators.append([tok, blocksize, streamoffset])
226                 streamoffset += blocksize
227                 continue
228
229             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
230             if s:
231                 pos = long(s.group(1))
232                 size = long(s.group(2))
233                 name = s.group(3).replace('\\040', ' ')
234                 if name not in self._files:
235                     self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
236                 else:
237                     n = self._files[name]
238                     n.segments.append([pos, size, n.size()])
239                 continue
240
241             raise errors.SyntaxError("Invalid manifest format")
242
243     def name(self):
244         return self._stream_name
245
246     def files(self):
247         return self._files
248
249     def all_files(self):
250         return self._files.values()
251
252     def size(self):
253         n = self._data_locators[-1]
254         return n[OFFSET] + n[BLOCKSIZE]
255
256     def locators_and_ranges(self, range_start, range_size):
257         return locators_and_ranges(self._data_locators, range_start, range_size)
258
259     def readfrom(self, start, size):
260         """Read up to 'size' bytes from the stream, starting at 'start'"""
261         if size == 0:
262             return ''
263         data = ''
264         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
265             data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
266         return data
267
268     def manifest_text(self, strip=False):
269         manifest_text = [self.name().replace(' ', '\\040')]
270         if strip:
271             for d in self._data_locators:
272                 m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR])
273                 manifest_text.append(m.group(0))
274         else:
275             manifest_text.extend([d[LOCATOR] for d in self._data_locators])
276         manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
277                                         for seg in f.segments])
278                               for f in self._files.values()])
279         return ' '.join(manifest_text) + '\n'