Merge branch '3627-selectable-projects' closes #3627
[arvados.git] / sdk / python / arvados / stream.py
1 import gflags
2 import httplib
3 import httplib2
4 import os
5 import pprint
6 import sys
7 import types
8 import subprocess
9 import json
10 import UserDict
11 import re
12 import hashlib
13 import string
14 import bz2
15 import zlib
16 import fcntl
17 import time
18 import threading
19 import collections
20
21 from keep import *
22 import config
23 import errors
24
25 LOCATOR = 0
26 BLOCKSIZE = 1
27 OFFSET = 2
28 SEGMENTSIZE = 3
29
30 def locators_and_ranges(data_locators, range_start, range_size, debug=False):
31     '''
32     Get blocks that are covered by the range
33     data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
34     range_start: start of range
35     range_size: size of range
36     returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
37     '''
38     if range_size == 0:
39         return []
40     resp = []
41     range_start = long(range_start)
42     range_size = long(range_size)
43     range_end = range_start + range_size
44     block_start = 0L
45
46     # range_start/block_start is the inclusive lower bound
47     # range_end/block_end is the exclusive upper bound
48
49     hi = len(data_locators)
50     lo = 0
51     i = int((hi + lo) / 2)
52     block_size = data_locators[i][BLOCKSIZE]
53     block_start = data_locators[i][OFFSET]
54     block_end = block_start + block_size
55     if debug: print '---'
56
57     # perform a binary search for the first block
58     # assumes that all of the blocks are contigious, so range_start is guaranteed
59     # to either fall into the range of a block or be outside the block range entirely
60     while not (range_start >= block_start and range_start < block_end):
61         if lo == i:
62             # must be out of range, fail
63             return []
64         if range_start > block_start:
65             lo = i
66         else:
67             hi = i
68         i = int((hi + lo) / 2)
69         if debug: print lo, i, hi
70         block_size = data_locators[i][BLOCKSIZE]
71         block_start = data_locators[i][OFFSET]
72         block_end = block_start + block_size
73
74     while i < len(data_locators):
75         locator, block_size, block_start = data_locators[i]
76         block_end = block_start + block_size
77         if debug:
78             print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
79         if range_end <= block_start:
80             # range ends before this block starts, so don't look at any more locators
81             break
82
83         #if range_start >= block_end:
84             # range starts after this block ends, so go to next block
85             # we should always start at the first block due to the binary above, so this test is redundant
86             #next
87
88         if range_start >= block_start and range_end <= block_end:
89             # range starts and ends in this block
90             resp.append([locator, block_size, range_start - block_start, range_size])
91         elif range_start >= block_start and range_end > block_end:
92             # range starts in this block
93             resp.append([locator, block_size, range_start - block_start, block_end - range_start])
94         elif range_start < block_start and range_end > block_end:
95             # range starts in a previous block and extends to further blocks
96             resp.append([locator, block_size, 0L, block_size])
97         elif range_start < block_start and range_end <= block_end:
98             # range starts in a previous block and ends in this block
99             resp.append([locator, block_size, 0L, range_end - block_start])
100         block_start = block_end
101         i += 1
102     return resp
103
104
105 class StreamFileReader(object):
106     def __init__(self, stream, segments, name):
107         self._stream = stream
108         self.segments = segments
109         self._name = name
110         self._filepos = 0L
111
112     def name(self):
113         return self._name
114
115     def decompressed_name(self):
116         return re.sub('\.(bz2|gz)$', '', self._name)
117
118     def stream_name(self):
119         return self._stream.name()
120
121     def seek(self, pos):
122         self._filepos = min(max(pos, 0L), self.size())
123
124     def tell(self):
125         return self._filepos
126
127     def size(self):
128         n = self.segments[-1]
129         return n[OFFSET] + n[BLOCKSIZE]
130
131     def read(self, size):
132         """Read up to 'size' bytes from the stream, starting at the current file position"""
133         if size == 0:
134             return ''
135
136         data = ''
137         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, self._filepos, size):
138             data += self._stream.readfrom(locator+segmentoffset, segmentsize)
139         self._filepos += len(data)
140         return data
141
142     def readfrom(self, start, size):
143         """Read up to 'size' bytes from the stream, starting at 'start'"""
144         if size == 0:
145             return ''
146
147         data = ''
148         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
149             data += self._stream.readfrom(locator+segmentoffset, segmentsize)
150         return data
151
152     def readall(self, size=2**20):
153         while True:
154             data = self.read(size)
155             if data == '':
156                 break
157             yield data
158
159     def decompress(self, decompress, size):
160         for segment in self.readall(size):
161             data = decompress(segment)
162             if data and data != '':
163                 yield data
164
165     def readall_decompressed(self, size=2**20):
166         self.seek(0)
167         if re.search('\.bz2$', self._name):
168             dc = bz2.BZ2Decompressor()
169             return self.decompress(lambda segment: dc.decompress(segment), size)
170         elif re.search('\.gz$', self._name):
171             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
172             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
173         else:
174             return self.readall(size)
175
176     def readlines(self, decompress=True):
177         if decompress:
178             datasource = self.readall_decompressed()
179         else:
180             datasource = self.readall()
181         data = ''
182         for newdata in datasource:
183             data += newdata
184             sol = 0
185             while True:
186                 eol = string.find(data, "\n", sol)
187                 if eol < 0:
188                     break
189                 yield data[sol:eol+1]
190                 sol = eol+1
191             data = data[sol:]
192         if data != '':
193             yield data
194
195     def as_manifest(self):
196         manifest_text = ['.']
197         manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
198         manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
199         return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
200
201 class StreamReader(object):
202     def __init__(self, tokens, keep=None, debug=False, _empty=False):
203         self._stream_name = None
204         self._data_locators = []
205         self._files = collections.OrderedDict()
206
207         if keep is None:
208             keep = KeepClient()
209         self._keep = keep
210
211         streamoffset = 0L
212
213         # parse stream
214         for tok in tokens:
215             if debug: print 'tok', tok
216             if self._stream_name == None:
217                 self._stream_name = tok.replace('\\040', ' ')
218                 continue
219
220             s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
221             if s:
222                 blocksize = long(s.group(1))
223                 self._data_locators.append([tok, blocksize, streamoffset])
224                 streamoffset += blocksize
225                 continue
226
227             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
228             if s:
229                 pos = long(s.group(1))
230                 size = long(s.group(2))
231                 name = s.group(3).replace('\\040', ' ')
232                 if name not in self._files:
233                     self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
234                 else:
235                     n = self._files[name]
236                     n.segments.append([pos, size, n.size()])
237                 continue
238
239             raise errors.SyntaxError("Invalid manifest format")
240
241     def name(self):
242         return self._stream_name
243
244     def files(self):
245         return self._files
246
247     def all_files(self):
248         return self._files.values()
249
250     def size(self):
251         n = self._data_locators[-1]
252         return n[OFFSET] + n[BLOCKSIZE]
253
254     def locators_and_ranges(self, range_start, range_size):
255         return locators_and_ranges(self._data_locators, range_start, range_size)
256
257     def readfrom(self, start, size):
258         """Read up to 'size' bytes from the stream, starting at 'start'"""
259         if size == 0:
260             return ''
261         data = ''
262         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
263             data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
264         return data
265
266     def manifest_text(self, strip=False):
267         manifest_text = [self.name().replace(' ', '\\040')]
268         if strip:
269             for d in self._data_locators:
270                 m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR])
271                 manifest_text.append(m.group(0))
272         else:
273             manifest_text.extend([d[LOCATOR] for d in self._data_locators])
274         manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
275                                         for seg in f.segments])
276                               for f in self._files.values()])
277         return ' '.join(manifest_text) + '\n'