3699: collection copying bug fixes
[arvados.git] / sdk / python / arvados / stream.py
1 import gflags
2 import httplib
3 import httplib2
4 import os
5 import pprint
6 import sys
7 import types
8 import subprocess
9 import json
10 import UserDict
11 import re
12 import hashlib
13 import string
14 import bz2
15 import zlib
16 import fcntl
17 import time
18 import threading
19 import collections
20
21 from arvados.retry import retry_method
22 from keep import *
23 import config
24 import errors
25
26 LOCATOR = 0
27 BLOCKSIZE = 1
28 OFFSET = 2
29 SEGMENTSIZE = 3
30
31 def locators_and_ranges(data_locators, range_start, range_size, debug=False):
32     '''
33     Get blocks that are covered by the range
34     data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
35     range_start: start of range
36     range_size: size of range
37     returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
38     '''
39     if range_size == 0:
40         return []
41     resp = []
42     range_start = long(range_start)
43     range_size = long(range_size)
44     range_end = range_start + range_size
45     block_start = 0L
46
47     # range_start/block_start is the inclusive lower bound
48     # range_end/block_end is the exclusive upper bound
49
50     hi = len(data_locators)
51     lo = 0
52     i = int((hi + lo) / 2)
53     block_size = data_locators[i][BLOCKSIZE]
54     block_start = data_locators[i][OFFSET]
55     block_end = block_start + block_size
56     if debug: print '---'
57
58     # perform a binary search for the first block
59     # assumes that all of the blocks are contigious, so range_start is guaranteed
60     # to either fall into the range of a block or be outside the block range entirely
61     while not (range_start >= block_start and range_start < block_end):
62         if lo == i:
63             # must be out of range, fail
64             return []
65         if range_start > block_start:
66             lo = i
67         else:
68             hi = i
69         i = int((hi + lo) / 2)
70         if debug: print lo, i, hi
71         block_size = data_locators[i][BLOCKSIZE]
72         block_start = data_locators[i][OFFSET]
73         block_end = block_start + block_size
74
75     while i < len(data_locators):
76         locator, block_size, block_start = data_locators[i]
77         block_end = block_start + block_size
78         if debug:
79             print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
80         if range_end <= block_start:
81             # range ends before this block starts, so don't look at any more locators
82             break
83
84         #if range_start >= block_end:
85             # range starts after this block ends, so go to next block
86             # we should always start at the first block due to the binary above, so this test is redundant
87             #next
88
89         if range_start >= block_start and range_end <= block_end:
90             # range starts and ends in this block
91             resp.append([locator, block_size, range_start - block_start, range_size])
92         elif range_start >= block_start and range_end > block_end:
93             # range starts in this block
94             resp.append([locator, block_size, range_start - block_start, block_end - range_start])
95         elif range_start < block_start and range_end > block_end:
96             # range starts in a previous block and extends to further blocks
97             resp.append([locator, block_size, 0L, block_size])
98         elif range_start < block_start and range_end <= block_end:
99             # range starts in a previous block and ends in this block
100             resp.append([locator, block_size, 0L, range_end - block_start])
101         block_start = block_end
102         i += 1
103     return resp
104
105
106 class StreamFileReader(object):
107     def __init__(self, stream, segments, name):
108         self._stream = stream
109         self.segments = segments
110         self._name = name
111         self._filepos = 0L
112         self.num_retries = stream.num_retries
113
114     def name(self):
115         return self._name
116
117     def decompressed_name(self):
118         return re.sub('\.(bz2|gz)$', '', self._name)
119
120     def stream_name(self):
121         return self._stream.name()
122
123     def seek(self, pos):
124         self._filepos = min(max(pos, 0L), self.size())
125
126     def tell(self):
127         return self._filepos
128
129     def size(self):
130         n = self.segments[-1]
131         return n[OFFSET] + n[BLOCKSIZE]
132
133     @retry_method
134     def read(self, size, num_retries=None):
135         """Read up to 'size' bytes from the stream, starting at the current file position"""
136         if size == 0:
137             return ''
138
139         data = ''
140         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
141         if available_chunks:
142             locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
143             data = self._stream.readfrom(locator+segmentoffset, segmentsize,
144                                          num_retries=num_retries)
145
146         self._filepos += len(data)
147         return data
148
149     @retry_method
150     def readfrom(self, start, size, num_retries=None):
151         """Read up to 'size' bytes from the stream, starting at 'start'"""
152         if size == 0:
153             return ''
154
155         data = []
156         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
157             data.append(self._stream.readfrom(locator+segmentoffset, segmentsize,
158                                               num_retries=num_retries))
159         return ''.join(data)
160
161     @retry_method
162     def readall(self, size=2**20, num_retries=None):
163         while True:
164             data = self.read(size, num_retries=num_retries)
165             if data == '':
166                 break
167             yield data
168
169     @retry_method
170     def decompress(self, decompress, size, num_retries=None):
171         for segment in self.readall(size, num_retries):
172             data = decompress(segment)
173             if data and data != '':
174                 yield data
175
176     @retry_method
177     def readall_decompressed(self, size=2**20, num_retries=None):
178         self.seek(0)
179         if re.search('\.bz2$', self._name):
180             dc = bz2.BZ2Decompressor()
181             return self.decompress(dc.decompress, size,
182                                    num_retries=num_retries)
183         elif re.search('\.gz$', self._name):
184             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
185             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
186                                    size, num_retries=num_retries)
187         else:
188             return self.readall(size, num_retries=num_retries)
189
190     @retry_method
191     def readlines(self, decompress=True, num_retries=None):
192         read_func = self.readall_decompressed if decompress else self.readall
193         data = ''
194         for newdata in read_func(num_retries=num_retries):
195             data += newdata
196             sol = 0
197             while True:
198                 eol = string.find(data, "\n", sol)
199                 if eol < 0:
200                     break
201                 yield data[sol:eol+1]
202                 sol = eol+1
203             data = data[sol:]
204         if data != '':
205             yield data
206
207     def as_manifest(self):
208         manifest_text = ['.']
209         manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
210         manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
211         return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
212
213
214 class StreamReader(object):
215     def __init__(self, tokens, keep=None, debug=False, _empty=False,
216                  num_retries=0):
217         self._stream_name = None
218         self._data_locators = []
219         self._files = collections.OrderedDict()
220         self._keep = keep
221         self.num_retries = num_retries
222
223         streamoffset = 0L
224
225         # parse stream
226         for tok in tokens:
227             if debug: print 'tok', tok
228             if self._stream_name == None:
229                 self._stream_name = tok.replace('\\040', ' ')
230                 continue
231
232             s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
233             if s:
234                 blocksize = long(s.group(1))
235                 self._data_locators.append([tok, blocksize, streamoffset])
236                 streamoffset += blocksize
237                 continue
238
239             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
240             if s:
241                 pos = long(s.group(1))
242                 size = long(s.group(2))
243                 name = s.group(3).replace('\\040', ' ')
244                 if name not in self._files:
245                     self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
246                 else:
247                     n = self._files[name]
248                     n.segments.append([pos, size, n.size()])
249                 continue
250
251             raise errors.SyntaxError("Invalid manifest format")
252
253     def name(self):
254         return self._stream_name
255
256     def files(self):
257         return self._files
258
259     def all_files(self):
260         return self._files.values()
261
262     def size(self):
263         n = self._data_locators[-1]
264         return n[OFFSET] + n[BLOCKSIZE]
265
266     def locators_and_ranges(self, range_start, range_size):
267         return locators_and_ranges(self._data_locators, range_start, range_size)
268
269     @retry_method
270     def readfrom(self, start, size, num_retries=None):
271         """Read up to 'size' bytes from the stream, starting at 'start'"""
272         if size == 0:
273             return ''
274         if self._keep is None:
275             self._keep = KeepClient(num_retries=self.num_retries)
276         data = []
277         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
278             data.append(self._keep.get(locator, num_retries=num_retries)[segmentoffset:segmentoffset+segmentsize])
279         return ''.join(data)
280
281     def manifest_text(self, strip=False):
282         manifest_text = [self.name().replace(' ', '\\040')]
283         if strip:
284             for d in self._data_locators:
285                 m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR])
286                 manifest_text.append(m.group(0))
287         else:
288             manifest_text.extend([d[LOCATOR] for d in self._data_locators])
289         manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
290                                         for seg in f.segments])
291                               for f in self._files.values()])
292         return ' '.join(manifest_text) + '\n'