cd1605ebe8b008e1b30b9fdb2057d61720a38f25
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 from .ranges import *
6 from arvados.retry import retry_method
7 import config
8 import hashlib
9
10 def split(path):
11     """split(path) -> streamname, filename
12
13     Separate the stream name and file name in a /-separated stream path.
14     If no stream name is available, assume '.'.
15     """
16     try:
17         stream_name, file_name = path.rsplit('/', 1)
18     except ValueError:  # No / in string
19         stream_name, file_name = '.', path
20     return stream_name, file_name
21
22 class ArvadosFileBase(object):
23     def __init__(self, name, mode):
24         self.name = name
25         self.mode = mode
26         self.closed = False
27
28     @staticmethod
29     def _before_close(orig_func):
30         @functools.wraps(orig_func)
31         def wrapper(self, *args, **kwargs):
32             if self.closed:
33                 raise ValueError("I/O operation on closed stream file")
34             return orig_func(self, *args, **kwargs)
35         return wrapper
36
37     def __enter__(self):
38         return self
39
40     def __exit__(self, exc_type, exc_value, traceback):
41         try:
42             self.close()
43         except Exception:
44             if exc_type is None:
45                 raise
46
47     def close(self):
48         self.closed = True
49
50
51 class ArvadosFileReaderBase(ArvadosFileBase):
52     class _NameAttribute(str):
53         # The Python file API provides a plain .name attribute.
54         # Older SDK provided a name() method.
55         # This class provides both, for maximum compatibility.
56         def __call__(self):
57             return self
58
59     def __init__(self, name, mode, num_retries=None):
60         super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode)
61         self._filepos = 0L
62         self.num_retries = num_retries
63         self._readline_cache = (None, None)
64
65     def __iter__(self):
66         while True:
67             data = self.readline()
68             if not data:
69                 break
70             yield data
71
72     def decompressed_name(self):
73         return re.sub('\.(bz2|gz)$', '', self.name)
74
75     @ArvadosFileBase._before_close
76     def seek(self, pos, whence=os.SEEK_CUR):
77         if whence == os.SEEK_CUR:
78             pos += self._filepos
79         elif whence == os.SEEK_END:
80             pos += self.size()
81         self._filepos = min(max(pos, 0L), self.size())
82
83     def tell(self):
84         return self._filepos
85
86     @ArvadosFileBase._before_close
87     @retry_method
88     def readall(self, size=2**20, num_retries=None):
89         while True:
90             data = self.read(size, num_retries=num_retries)
91             if data == '':
92                 break
93             yield data
94
95     @ArvadosFileBase._before_close
96     @retry_method
97     def readline(self, size=float('inf'), num_retries=None):
98         cache_pos, cache_data = self._readline_cache
99         if self.tell() == cache_pos:
100             data = [cache_data]
101         else:
102             data = ['']
103         data_size = len(data[-1])
104         while (data_size < size) and ('\n' not in data[-1]):
105             next_read = self.read(2 ** 20, num_retries=num_retries)
106             if not next_read:
107                 break
108             data.append(next_read)
109             data_size += len(next_read)
110         data = ''.join(data)
111         try:
112             nextline_index = data.index('\n') + 1
113         except ValueError:
114             nextline_index = len(data)
115         nextline_index = min(nextline_index, size)
116         self._readline_cache = (self.tell(), data[nextline_index:])
117         return data[:nextline_index]
118
119     @ArvadosFileBase._before_close
120     @retry_method
121     def decompress(self, decompress, size, num_retries=None):
122         for segment in self.readall(size, num_retries):
123             data = decompress(segment)
124             if data:
125                 yield data
126
127     @ArvadosFileBase._before_close
128     @retry_method
129     def readall_decompressed(self, size=2**20, num_retries=None):
130         self.seek(0)
131         if self.name.endswith('.bz2'):
132             dc = bz2.BZ2Decompressor()
133             return self.decompress(dc.decompress, size,
134                                    num_retries=num_retries)
135         elif self.name.endswith('.gz'):
136             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
137             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
138                                    size, num_retries=num_retries)
139         else:
140             return self.readall(size, num_retries=num_retries)
141
142     @ArvadosFileBase._before_close
143     @retry_method
144     def readlines(self, sizehint=float('inf'), num_retries=None):
145         data = []
146         data_size = 0
147         for s in self.readall(num_retries=num_retries):
148             data.append(s)
149             data_size += len(s)
150             if data_size >= sizehint:
151                 break
152         return ''.join(data).splitlines(True)
153
154
155 class StreamFileReader(ArvadosFileReaderBase):
156     def __init__(self, stream, segments, name):
157         super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries)
158         self._stream = stream
159         self.segments = segments
160
161     def stream_name(self):
162         return self._stream.name()
163
164     def size(self):
165         n = self.segments[-1]
166         return n.range_start + n.range_size
167
168     @ArvadosFileBase._before_close
169     @retry_method
170     def read(self, size, num_retries=None):
171         """Read up to 'size' bytes from the stream, starting at the current file position"""
172         if size == 0:
173             return ''
174
175         data = ''
176         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
177         if available_chunks:
178             lr = available_chunks[0]
179             data = self._stream._readfrom(lr.locator+lr.segment_offset,
180                                           lr.segment_size,
181                                           num_retries=num_retries)
182
183         self._filepos += len(data)
184         return data
185
186     @ArvadosFileBase._before_close
187     @retry_method
188     def readfrom(self, start, size, num_retries=None):
189         """Read up to 'size' bytes from the stream, starting at 'start'"""
190         if size == 0:
191             return ''
192
193         data = []
194         for lr in locators_and_ranges(self.segments, start, size):
195             data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size,
196                                               num_retries=num_retries))
197         return ''.join(data)
198
199     def as_manifest(self):
200         from stream import normalize_stream
201         segs = []
202         for r in self.segments:
203             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
204         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
205
206
207 class BufferBlock(object):
208     def __init__(self, locator, starting_size=2**14):
209         self.locator = locator
210         self.buffer_block = bytearray(starting_size)
211         self.buffer_view = memoryview(self.buffer_block)
212         self.write_pointer = 0
213
214     def append(self, data):
215         while (self.write_pointer+len(data)) > len(self.buffer_block):
216             new_buffer_block = bytearray(len(self.buffer_block) * 2)
217             new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
218             self.buffer_block = new_buffer_block
219             self.buffer_view = memoryview(self.buffer_block)
220         self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
221         self.write_pointer += len(data)
222
223     def size(self):
224         return self.write_pointer
225
226     def calculate_locator(self):
227         return "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
228
229
230 class ArvadosFile(object):
231     def __init__(self, stream=[], segments=[], keep=None):
232         '''
233         stream: a list of Range objects representing a block stream
234         segments: a list of Range objects representing segments
235         '''
236         self._modified = True
237         self._segments = []
238         for s in segments:
239             self.add_segment(stream, s.range_start, s.range_size)
240         self._current_bblock = None
241         self._bufferblocks = None
242         self._keep = keep
243
244     def set_unmodified(self):
245         self._modified = False
246
247     def modified(self):
248         return self._modified
249
250     def truncate(self, size):
251         new_segs = []
252         for r in self._segments:
253             range_end = r.range_start+r.range_size
254             if r.range_start >= size:
255                 # segment is past the trucate size, all done
256                 break
257             elif size < range_end:
258                 nr = Range(r.locator, r.range_start, size - r.range_start)
259                 nr.segment_offset = r.segment_offset
260                 new_segs.append(nr)
261                 break
262             else:
263                 new_segs.append(r)
264
265         self._segments = new_segs
266         self._modified = True
267
268     def _keepget(self, locator, num_retries):
269         if self._bufferblocks and locator in self._bufferblocks:
270             bb = self._bufferblocks[locator]
271             return bb.buffer_view[0:bb.write_pointer].tobytes()
272         else:
273             return self._keep.get(locator, num_retries=num_retries)
274
275     def readfrom(self, offset, size, num_retries):
276         if size == 0 or offset >= self.size():
277             return ''
278         if self._keep is None:
279             self._keep = KeepClient(num_retries=num_retries)
280         data = []
281         # TODO: initiate prefetch on all blocks in the range (offset, offset + size + config.KEEP_BLOCK_SIZE)
282
283         for lr in locators_and_ranges(self._segments, offset, size):
284             # TODO: if data is empty, wait on block get, otherwise only
285             # get more data if the block is already in the cache.
286             data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
287         return ''.join(data)
288
289     def _init_bufferblock(self):
290         if self._bufferblocks is None:
291             self._bufferblocks = {}
292         self._current_bblock = BufferBlock("bufferblock%i" % len(self._bufferblocks))
293         self._bufferblocks[self._current_bblock.locator] = self._current_bblock
294
295     def _repack_writes(self):
296         pass
297          # TODO: fixme
298         '''Test if the buffer block has more data than is referenced by actual segments
299         (this happens when a buffered write over-writes a file range written in
300         a previous buffered write).  Re-pack the buffer block for efficiency
301         and to avoid leaking information.
302         '''
303         segs = self._segments
304
305         # Sum up the segments to get the total bytes of the file referencing
306         # into the buffer block.
307         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.locator]
308         write_total = sum([s.range_size for s in bufferblock_segs])
309
310         if write_total < self._current_bblock.size():
311             # There is more data in the buffer block than is actually accounted for by segments, so
312             # re-pack into a new buffer by copying over to a new buffer block.
313             new_bb = BufferBlock(self._current_bblock.locator, starting_size=write_total)
314             for t in bufferblock_segs:
315                 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
316                 t.segment_offset = new_bb.size() - t.range_size
317
318             self._current_bblock = new_bb
319             self._bufferblocks[self._current_bblock.locator] = self._current_bblock
320
321
322     def writeto(self, offset, data, num_retries):
323         if len(data) == 0:
324             return
325
326         if offset > self.size():
327             raise ArgumentError("Offset is past the end of the file")
328
329         if len(data) > config.KEEP_BLOCK_SIZE:
330             raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
331
332         self._modified = True
333
334         if self._current_bblock is None:
335             self._init_bufferblock()
336
337         if (self._current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
338             self._repack_writes()
339             if (self._current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
340                 self._init_bufferblock()
341
342         self._current_bblock.append(data)
343         replace_range(self._segments, offset, len(data), self._current_bblock.locator, self._current_bblock.write_pointer - len(data))
344
345     def add_segment(self, blocks, pos, size):
346         self._modified = True
347         for lr in locators_and_ranges(blocks, pos, size):
348             last = self._segments[-1] if self._segments else Range(0, 0, 0)
349             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
350             self._segments.append(r)
351
352     def size(self):
353         if self._segments:
354             n = self._segments[-1]
355             return n.range_start + n.range_size
356         else:
357             return 0
358
359
360 class ArvadosFileReader(ArvadosFileReaderBase):
361     def __init__(self, arvadosfile, name, mode="r", num_retries=None):
362         super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
363         self.arvadosfile = arvadosfile
364
365     def size(self):
366         return self.arvadosfile.size()
367
368     @ArvadosFileBase._before_close
369     @retry_method
370     def read(self, size, num_retries=None):
371         """Read up to 'size' bytes from the stream, starting at the current file position"""
372         data = self.arvadosfile.readfrom(self._filepos, size, num_retries=num_retries)
373         self._filepos += len(data)
374         return data
375
376     @ArvadosFileBase._before_close
377     @retry_method
378     def readfrom(self, offset, size, num_retries=None):
379         """Read up to 'size' bytes from the stream, starting at the current file position"""
380         return self.arvadosfile.readfrom(offset, size, num_retries)
381
382     def flush(self):
383         pass
384
385 class ArvadosFileWriter(ArvadosFileReader):
386     def __init__(self, arvadosfile, name, mode, num_retries=None):
387         super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
388
389     @ArvadosFileBase._before_close
390     @retry_method
391     def write(self, data, num_retries=None):
392         if self.mode[0] == "a":
393             self.arvadosfile.writeto(self.size(), data)
394         else:
395             self.arvadosfile.writeto(self._filepos, data, num_retries)
396             self._filepos += len(data)
397
398     @ArvadosFileBase._before_close
399     @retry_method
400     def writelines(self, seq, num_retries=None):
401         for s in seq:
402             self.write(s)
403
404     def truncate(self, size=None):
405         if size is None:
406             size = self._filepos
407         self.arvadosfile.truncate(size)
408         if self._filepos > self.size():
409             self._filepos = self.size()