caching wip
[arvados.git] / sdk / python / arvados / stream.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20 import collections
21
22 from keep import *
23 import config
24 import errors
25
26 LOCATOR = 0
27 BLOCKSIZE = 1
28 OFFSET = 2
29 SEGMENTSIZE = 3
30
31 def locators_and_ranges(data_locators, range_start, range_size, debug=False):
32     '''
33     Get blocks that are covered by the range
34     data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
35     range_start: start of range
36     range_size: size of range
37     returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
38     '''
39     if range_size == 0:
40         return []
41     resp = []
42     range_start = long(range_start)
43     range_size = long(range_size)
44     range_end = range_start + range_size
45     block_start = 0L
46
47     # range_start/block_start is the inclusive lower bound
48     # range_end/block_end is the exclusive upper bound
49
50     hi = len(data_locators)
51     lo = 0
52     i = int((hi + lo) / 2)
53     block_size = data_locators[i][BLOCKSIZE]
54     block_start = data_locators[i][OFFSET]
55     block_end = block_start + block_size
56     if debug: print '---'
57
58     # perform a binary search for the first block
59     # assumes that all of the blocks are contigious, so range_start is guaranteed
60     # to either fall into the range of a block or be outside the block range entirely
61     while not (range_start >= block_start and range_start < block_end):
62         if lo == i:
63             # must be out of range, fail
64             return []
65         if range_start > block_start:
66             lo = i
67         else:
68             hi = i
69         i = int((hi + lo) / 2)
70         if debug: print lo, i, hi
71         block_size = data_locators[i][BLOCKSIZE]
72         block_start = data_locators[i][OFFSET]
73         block_end = block_start + block_size
74     
75     while i < len(data_locators):
76         locator, block_size, block_start = data_locators[i]
77         block_end = block_start + block_size
78         if debug:
79             print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
80         if range_end <= block_start:
81             # range ends before this block starts, so don't look at any more locators
82             break
83
84         #if range_start >= block_end:
85             # range starts after this block ends, so go to next block
86             # we should always start at the first block due to the binary above, so this test is redundant
87             #next
88
89         if range_start >= block_start and range_end <= block_end:
90             # range starts and ends in this block
91             resp.append([locator, block_size, range_start - block_start, range_size])
92         elif range_start >= block_start and range_end > block_end:
93             # range starts in this block
94             resp.append([locator, block_size, range_start - block_start, block_end - range_start])
95         elif range_start < block_start and range_end > block_end:
96             # range starts in a previous block and extends to further blocks
97             resp.append([locator, block_size, 0L, block_size])
98         elif range_start < block_start and range_end <= block_end:
99             # range starts in a previous block and ends in this block
100             resp.append([locator, block_size, 0L, range_end - block_start])
101         block_start = block_end
102         i += 1
103     return resp
104
105
106 class StreamFileReader(object):
107     def __init__(self, stream, segments, name):
108         self._stream = stream
109         self.segments = segments
110         self._name = name
111         self._filepos = 0L
112
113     def name(self):
114         return self._name
115
116     def decompressed_name(self):
117         return re.sub('\.(bz2|gz)$', '', self._name)
118
119     def stream_name(self):
120         return self._stream.name()
121
122     def seek(self, pos):
123         self._filepos = min(max(pos, 0L), self.size())
124
125     def tell(self):
126         return self._filepos
127
128     def size(self):
129         n = self.segments[-1]
130         return n[OFFSET] + n[BLOCKSIZE]
131
132     def read(self, size):
133         """Read up to 'size' bytes from the stream, starting at the current file position"""
134         if size == 0:
135             return ''
136
137         data = ''
138         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, self._filepos, size):
139             data += self._stream.readfrom(locator+segmentoffset, segmentsize)
140             self._filepos += len(data)
141         return data
142
143     def readfrom(self, start, size):
144         """Read up to 'size' bytes from the stream, starting at 'start'"""
145         if size == 0:
146             return ''
147
148         data = ''
149         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
150             data += self._stream.readfrom(locator+segmentoffset, segmentsize)
151         return data
152
153     def readall(self, size=2**20):
154         while True:
155             data = self.read(size)
156             if data == '':
157                 break
158             yield data
159
160     def decompress(self, decompress, size):
161         for segment in self.readall(size):
162             data = decompress(segment)
163             if data and data != '':
164                 yield data
165
166     def readall_decompressed(self, size=2**20):
167         self.seek(0)
168         if re.search('\.bz2$', self._name):
169             dc = bz2.BZ2Decompressor()
170             return self.decompress(lambda segment: dc.decompress(segment), size)
171         elif re.search('\.gz$', self._name):
172             dc = zlib.decompressobj(16+zlib.MAX_WBITS)            
173             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
174         else:
175             return self.readall(size)
176
177     def readlines(self, decompress=True):
178         if decompress:
179             datasource = self.readall_decompressed()
180         else:
181             datasource = self.readall()
182         data = ''
183         for newdata in datasource:
184             data += newdata
185             sol = 0
186             while True:
187                 eol = string.find(data, "\n", sol)
188                 if eol < 0:
189                     break
190                 yield data[sol:eol+1]
191                 sol = eol+1
192             data = data[sol:]
193         if data != '':
194             yield data
195
196 class StreamReader(object):
197     def __init__(self, tokens, keep=None, debug=False):
198         self._stream_name = None
199         self._data_locators = []
200         self._files = collections.OrderedDict()
201
202         if keep != None:
203             self._keep = keep
204         else:
205             self._keep = Keep.global_client_object()
206             
207         streamoffset = 0L
208
209         # parse stream
210         for tok in tokens:
211             if debug: print 'tok', tok
212             if self._stream_name == None:
213                 self._stream_name = tok.replace('\\040', ' ')
214                 continue
215
216             s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
217             if s:
218                 blocksize = long(s.group(1))
219                 self._data_locators.append([tok, blocksize, streamoffset])
220                 streamoffset += blocksize
221                 continue
222
223             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
224             if s:
225                 pos = long(s.group(1))
226                 size = long(s.group(2))
227                 name = s.group(3).replace('\\040', ' ')
228                 if name not in self._files:
229                     self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
230                 else:
231                     n = self._files[name]
232                     n.segments.append([pos, size, n.size()])
233                 continue
234
235             raise errors.SyntaxError("Invalid manifest format")
236
237     def name(self):
238         return self._stream_name
239
240     def files(self):
241         return self._files
242
243     def all_files(self):
244         return self._files.values()
245
246     def size(self):
247         n = self._data_locators[-1]
248         return n[OFFSET] + n[BLOCKSIZE]
249
250     def locators_and_ranges(self, range_start, range_size):
251         return locators_and_ranges(self._data_locators, range_start, range_size)
252
253     def readfrom(self, start, size):
254         """Read up to 'size' bytes from the stream, starting at 'start'"""
255         if size == 0:
256             return ''
257         data = ''
258         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
259             data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
260         return data