4823: Working on method documentation and comments for arvfile
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 from .ranges import *
6 from arvados.retry import retry_method
7 import config
8 import hashlib
9 import hashlib
10 import threading
11 import Queue
12
13 def split(path):
14     """split(path) -> streamname, filename
15
16     Separate the stream name and file name in a /-separated stream path.
17     If no stream name is available, assume '.'.
18     """
19     try:
20         stream_name, file_name = path.rsplit('/', 1)
21     except ValueError:  # No / in string
22         stream_name, file_name = '.', path
23     return stream_name, file_name
24
25 class ArvadosFileBase(object):
26     def __init__(self, name, mode):
27         self.name = name
28         self.mode = mode
29         self.closed = False
30
31     @staticmethod
32     def _before_close(orig_func):
33         @functools.wraps(orig_func)
34         def wrapper(self, *args, **kwargs):
35             if self.closed:
36                 raise ValueError("I/O operation on closed stream file")
37             return orig_func(self, *args, **kwargs)
38         return wrapper
39
40     def __enter__(self):
41         return self
42
43     def __exit__(self, exc_type, exc_value, traceback):
44         try:
45             self.close()
46         except Exception:
47             if exc_type is None:
48                 raise
49
50     def close(self):
51         self.closed = True
52
53
54 class ArvadosFileReaderBase(ArvadosFileBase):
55     class _NameAttribute(str):
56         # The Python file API provides a plain .name attribute.
57         # Older SDK provided a name() method.
58         # This class provides both, for maximum compatibility.
59         def __call__(self):
60             return self
61
62     def __init__(self, name, mode, num_retries=None):
63         super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode)
64         self._filepos = 0L
65         self.num_retries = num_retries
66         self._readline_cache = (None, None)
67
68     def __iter__(self):
69         while True:
70             data = self.readline()
71             if not data:
72                 break
73             yield data
74
75     def decompressed_name(self):
76         return re.sub('\.(bz2|gz)$', '', self.name)
77
78     @ArvadosFileBase._before_close
79     def seek(self, pos, whence=os.SEEK_CUR):
80         if whence == os.SEEK_CUR:
81             pos += self._filepos
82         elif whence == os.SEEK_END:
83             pos += self.size()
84         self._filepos = min(max(pos, 0L), self.size())
85
86     def tell(self):
87         return self._filepos
88
89     @ArvadosFileBase._before_close
90     @retry_method
91     def readall(self, size=2**20, num_retries=None):
92         while True:
93             data = self.read(size, num_retries=num_retries)
94             if data == '':
95                 break
96             yield data
97
98     @ArvadosFileBase._before_close
99     @retry_method
100     def readline(self, size=float('inf'), num_retries=None):
101         cache_pos, cache_data = self._readline_cache
102         if self.tell() == cache_pos:
103             data = [cache_data]
104         else:
105             data = ['']
106         data_size = len(data[-1])
107         while (data_size < size) and ('\n' not in data[-1]):
108             next_read = self.read(2 ** 20, num_retries=num_retries)
109             if not next_read:
110                 break
111             data.append(next_read)
112             data_size += len(next_read)
113         data = ''.join(data)
114         try:
115             nextline_index = data.index('\n') + 1
116         except ValueError:
117             nextline_index = len(data)
118         nextline_index = min(nextline_index, size)
119         self._readline_cache = (self.tell(), data[nextline_index:])
120         return data[:nextline_index]
121
122     @ArvadosFileBase._before_close
123     @retry_method
124     def decompress(self, decompress, size, num_retries=None):
125         for segment in self.readall(size, num_retries):
126             data = decompress(segment)
127             if data:
128                 yield data
129
130     @ArvadosFileBase._before_close
131     @retry_method
132     def readall_decompressed(self, size=2**20, num_retries=None):
133         self.seek(0)
134         if self.name.endswith('.bz2'):
135             dc = bz2.BZ2Decompressor()
136             return self.decompress(dc.decompress, size,
137                                    num_retries=num_retries)
138         elif self.name.endswith('.gz'):
139             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
140             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
141                                    size, num_retries=num_retries)
142         else:
143             return self.readall(size, num_retries=num_retries)
144
145     @ArvadosFileBase._before_close
146     @retry_method
147     def readlines(self, sizehint=float('inf'), num_retries=None):
148         data = []
149         data_size = 0
150         for s in self.readall(num_retries=num_retries):
151             data.append(s)
152             data_size += len(s)
153             if data_size >= sizehint:
154                 break
155         return ''.join(data).splitlines(True)
156
157
158 class StreamFileReader(ArvadosFileReaderBase):
159     def __init__(self, stream, segments, name):
160         super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries)
161         self._stream = stream
162         self.segments = segments
163
164     def stream_name(self):
165         return self._stream.name()
166
167     def size(self):
168         n = self.segments[-1]
169         return n.range_start + n.range_size
170
171     @ArvadosFileBase._before_close
172     @retry_method
173     def read(self, size, num_retries=None):
174         """Read up to 'size' bytes from the stream, starting at the current file position"""
175         if size == 0:
176             return ''
177
178         data = ''
179         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
180         if available_chunks:
181             lr = available_chunks[0]
182             data = self._stream._readfrom(lr.locator+lr.segment_offset,
183                                           lr.segment_size,
184                                           num_retries=num_retries)
185
186         self._filepos += len(data)
187         return data
188
189     @ArvadosFileBase._before_close
190     @retry_method
191     def readfrom(self, start, size, num_retries=None):
192         """Read up to 'size' bytes from the stream, starting at 'start'"""
193         if size == 0:
194             return ''
195
196         data = []
197         for lr in locators_and_ranges(self.segments, start, size):
198             data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size,
199                                               num_retries=num_retries))
200         return ''.join(data)
201
202     def as_manifest(self):
203         from stream import normalize_stream
204         segs = []
205         for r in self.segments:
206             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
207         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
208
209
210 class BufferBlock(object):
211 '''
212 A BufferBlock is a stand-in for a Keep block that is in the process of being
213 written.  Writers can append to it, get the size, and compute the Keep locator.
214
215 There are three valid states:
216
217 WRITABLE - can append
218
219 PENDING - is in the process of being uploaded to Keep, append is an error
220
221 COMMITTED - the block has been written to Keep, its internal buffer has been
222 released, and the BufferBlock should be discarded in favor of fetching the
223 block through normal Keep means.
224 '''
225     WRITABLE = 0
226     PENDING = 1
227     COMMITTED = 2
228
229     def __init__(self, blockid, starting_capacity):
230         '''
231         blockid: the identifier for this block
232         starting_capacity: the initial buffer capacity
233         '''
234         self.blockid = blockid
235         self.buffer_block = bytearray(starting_capacity)
236         self.buffer_view = memoryview(self.buffer_block)
237         self.write_pointer = 0
238         self.state = BufferBlock.WRITABLE
239         self._locator = None
240
241     def append(self, data):
242         '''
243         Append some data to the buffer.  Only valid if the block is in WRITABLE
244         state.  Implements an expanding buffer, doubling capacity as needed to
245         accomdate all the data.
246         '''
247         if self.state == BufferBlock.WRITABLE:
248             while (self.write_pointer+len(data)) > len(self.buffer_block):
249                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
250                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
251                 self.buffer_block = new_buffer_block
252                 self.buffer_view = memoryview(self.buffer_block)
253             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
254             self.write_pointer += len(data)
255             self._locator = None
256         else:
257             raise AssertionError("Buffer block is not writable")
258
259     def size(self):
260         '''Amount of data written to the buffer'''
261         return self.write_pointer
262
263     def locator(self):
264         '''The Keep locator for this buffer's contents.'''
265         if self._locator is None:
266             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
267         return self._locator
268
269 class AsyncKeepWriteErrors(Exception):
270     '''
271     Roll up one or more Keep write exceptions (generated by background
272     threads) into a single one.
273     '''
274     def __init__(self, errors):
275         self.errors = errors
276
277     def __repr__(self):
278         return "\n".join(self.errors)
279
280 class BlockManager(object):
281     '''
282     BlockManager handles buffer blocks, background block uploads, and
283     background block prefetch for a Collection of ArvadosFiles.
284     '''
285     def __init__(self, keep):
286         '''keep: KeepClient object to use'''
287         self._keep = keep
288         self._bufferblocks = {}
289         self._put_queue = None
290         self._put_errors = None
291         self._put_threads = None
292         self._prefetch_queue = None
293         self._prefetch_threads = None
294
295     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14):
296         '''
297         Allocate a new, empty bufferblock in WRITABLE state and return it.
298         blockid: optional block identifier, otherwise one will be automatically assigned
299         starting_capacity: optional capacity, otherwise will use default capacity
300         '''
301         if blockid is None:
302             blockid = "bufferblock%i" % len(self._bufferblocks)
303         bb = BufferBlock(blockid, starting_capacity=starting_capacity)
304         self._bufferblocks[bb.blockid] = bb
305         return bb
306
307     def stop_threads(self):
308         '''
309         Shut down and wait for background upload and download threads to finish.
310         '''
311         if self._put_threads is not None:
312             for t in self._put_threads:
313                 self._put_queue.put(None)
314             for t in self._put_threads:
315                 t.join()
316         self._put_threads = None
317         self._put_queue = None
318         self._put_errors = None
319
320         if self._prefetch_threads is not None:
321             for t in self._prefetch_threads:
322                 self._prefetch_queue.put(None)
323             for t in self._prefetch_threads:
324                 t.join()
325         self._prefetch_threads = None
326         self._prefetch_queue = None
327
328     def commit_bufferblock(self, block):
329         '''
330         Initiate a background upload of a bufferblock.  This will block if the
331         upload queue is at capacity, otherwise it will return immediately.
332         '''
333
334         def worker(self):
335             '''
336             Background uploader thread.
337             '''
338             while True:
339                 try:
340                     b = self._put_queue.get()
341                     if b is None:
342                         return
343                     b._locator = self._keep.put(b.buffer_view[0:b.write_pointer].tobytes())
344                     b.state = BufferBlock.COMMITTED
345                     b.buffer_view = None
346                     b.buffer_block = None
347                 except Exception as e:
348                     print e
349                     self._put_errors.put(e)
350                 finally:
351                     if self._put_queue is not None:
352                         self._put_queue.task_done()
353
354         if self._put_threads is None:
355             # Start uploader threads.
356
357             # If we don't limit the Queue size, the upload queue can quickly
358             # grow to take up gigabytes of RAM if the writing process is
359             # generating data more quickly than it can be send to the Keep
360             # servers.
361             #
362             # With two upload threads and a queue size of 2, this means up to 4
363             # blocks pending.  If they are full 64 MiB blocks, that means up to
364             # 256 MiB of internal buffering, which is the same size as the
365             # default download block cache in KeepClient.
366             self._put_queue = Queue.Queue(maxsize=2)
367             self._put_errors = Queue.Queue()
368             self._put_threads = [threading.Thread(target=worker, args=(self,)),
369                                  threading.Thread(target=worker, args=(self,))]
370             for t in self._put_threads:
371                 t.daemon = True
372                 t.start()
373
374         # Mark the block as PENDING so to disallow any more appends.
375         block.state = BufferBlock.PENDING
376         self._put_queue.put(block)
377
378     def get_block(self, locator, num_retries, cache_only=False):
379         '''
380         Fetch a block.  First checks to see if the locator is a BufferBlock and
381         return that, if not, passes the request through to KeepClient.get().
382         '''
383         if locator in self._bufferblocks:
384             bb = self._bufferblocks[locator]
385             if bb.state != BufferBlock.COMMITTED:
386                 return bb.buffer_view[0:bb.write_pointer].tobytes()
387             else:
388                 locator = bb._locator
389         return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only)
390
391     def commit_all(self):
392         '''
393         Commit all outstanding buffer blocks.  Unlike commit_bufferblock(), this
394         is a synchronous call, and will not return until all buffer blocks are
395         uploaded.  Raises AsyncKeepWriteErrors() if any blocks failed to
396         upload.
397         '''
398         for k,v in self._bufferblocks.items():
399             if v.state == BufferBlock.WRITABLE:
400                 self.commit_bufferblock(v)
401         if self._put_queue is not None:
402             self._put_queue.join()
403             if not self._put_errors.empty():
404                 e = []
405                 try:
406                     while True:
407                         e.append(self._put_errors.get(False))
408                 except Queue.Empty:
409                     pass
410                 raise AsyncKeepWriteErrors(e)
411
412     def block_prefetch(self, locator):
413         '''
414         Initiate a background download of a block.  This assumes that the
415         underlying KeepClient implements a block cache, so repeated requests
416         for the same block will not result in repeated downloads (unless the
417         block is evicted from the cache.)  This method does not block.
418         '''
419         def worker(self):
420             '''Background downloader thread.'''
421             while True:
422                 try:
423                     b = self._prefetch_queue.get()
424                     if b is None:
425                         return
426                     self._keep.get(b)
427                 except:
428                     pass
429
430         if locator in self._bufferblocks:
431             return
432         if self._prefetch_threads is None:
433             self._prefetch_queue = Queue.Queue()
434             self._prefetch_threads = [threading.Thread(target=worker, args=(self,)),
435                                       threading.Thread(target=worker, args=(self,))]
436             for t in self._prefetch_threads:
437                 t.daemon = True
438                 t.start()
439         self._prefetch_queue.put(locator)
440
441 class ArvadosFile(object):
442     '''
443     Manages the underyling representation of a file in Keep as a sequence of
444     segments over a set of blocks, supporting random read/write access.
445     '''
446
447     def __init__(self, parent, stream=[], segments=[]):
448         '''
449         stream: a list of Range objects representing a block stream
450         segments: a list of Range objects representing segments
451         '''
452         self.parent = parent
453         self._modified = True
454         self.segments = []
455         for s in segments:
456             self.add_segment(stream, s.locator, s.range_size)
457         self._current_bblock = None
458         self.lock = threading.Lock()
459
460     def clone(self):
461         '''
462         Make a copy of this file.
463         '''
464         # TODO: copy bufferblocks.
465         with self.lock:
466             cp = ArvadosFile()
467             cp.parent = self.parent
468             cp._modified = False
469             cp.segments = [Range(r.locator, r.range_start, r.range_size, r.segment_offset) for r in self.segments]
470             return cp
471
472     def set_unmodified(self):
473         self._modified = False
474
475     def modified(self):
476         return self._modified
477
478     def truncate(self, size):
479         new_segs = []
480         for r in self.segments:
481             range_end = r.range_start+r.range_size
482             if r.range_start >= size:
483                 # segment is past the trucate size, all done
484                 break
485             elif size < range_end:
486                 nr = Range(r.locator, r.range_start, size - r.range_start)
487                 nr.segment_offset = r.segment_offset
488                 new_segs.append(nr)
489                 break
490             else:
491                 new_segs.append(r)
492
493         self.segments = new_segs
494         self._modified = True
495
496     def readfrom(self, offset, size, num_retries):
497         if size == 0 or offset >= self.size():
498             return ''
499         data = []
500
501         for lr in locators_and_ranges(self.segments, offset, size + config.KEEP_BLOCK_SIZE):
502             self.parent._my_block_manager().block_prefetch(lr.locator)
503
504         for lr in locators_and_ranges(self.segments, offset, size):
505             d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
506             if d:
507                 data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size])
508             else:
509                 break
510         return ''.join(data)
511
512     def _repack_writes(self):
513         '''Test if the buffer block has more data than is referenced by actual segments
514         (this happens when a buffered write over-writes a file range written in
515         a previous buffered write).  Re-pack the buffer block for efficiency
516         and to avoid leaking information.
517         '''
518         segs = self.segments
519
520         # Sum up the segments to get the total bytes of the file referencing
521         # into the buffer block.
522         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
523         write_total = sum([s.range_size for s in bufferblock_segs])
524
525         if write_total < self._current_bblock.size():
526             # There is more data in the buffer block than is actually accounted for by segments, so
527             # re-pack into a new buffer by copying over to a new buffer block.
528             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_size=write_total)
529             for t in bufferblock_segs:
530                 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
531                 t.segment_offset = new_bb.size() - t.range_size
532
533             self._current_bblock = new_bb
534
535     def writeto(self, offset, data, num_retries):
536         if len(data) == 0:
537             return
538
539         if offset > self.size():
540             raise ArgumentError("Offset is past the end of the file")
541
542         if len(data) > config.KEEP_BLOCK_SIZE:
543             raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
544
545         self._modified = True
546
547         if self._current_bblock is None or self._current_bblock.state != BufferBlock.WRITABLE:
548             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock()
549
550         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
551             self._repack_writes()
552             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
553                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
554                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock()
555
556         self._current_bblock.append(data)
557         replace_range(self.segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
558
559     def add_segment(self, blocks, pos, size):
560         self._modified = True
561         for lr in locators_and_ranges(blocks, pos, size):
562             last = self.segments[-1] if self.segments else Range(0, 0, 0)
563             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
564             self.segments.append(r)
565
566     def size(self):
567         if self.segments:
568             n = self.segments[-1]
569             return n.range_start + n.range_size
570         else:
571             return 0
572
573
574 class ArvadosFileReader(ArvadosFileReaderBase):
575     def __init__(self, arvadosfile, name, mode="r", num_retries=None):
576         super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
577         self.arvadosfile = arvadosfile.clone()
578
579     def size(self):
580         return self.arvadosfile.size()
581
582     @ArvadosFileBase._before_close
583     @retry_method
584     def read(self, size, num_retries=None):
585         """Read up to 'size' bytes from the stream, starting at the current file position"""
586         data = self.arvadosfile.readfrom(self._filepos, size, num_retries=num_retries)
587         self._filepos += len(data)
588         return data
589
590     @ArvadosFileBase._before_close
591     @retry_method
592     def readfrom(self, offset, size, num_retries=None):
593         """Read up to 'size' bytes from the stream, starting at the current file position"""
594         return self.arvadosfile.readfrom(offset, size, num_retries)
595
596     def flush(self):
597         pass
598
599
600 class SynchronizedArvadosFile(object):
601     def __init__(self, arvadosfile):
602         self.arvadosfile = arvadosfile
603
604     def clone(self):
605         return self
606
607     def __getattr__(self, name):
608         with self.arvadosfile.lock:
609             return getattr(self.arvadosfile, name)
610
611
612 class ArvadosFileWriter(ArvadosFileReader):
613     def __init__(self, arvadosfile, name, mode, num_retries=None):
614         self.arvadosfile = SynchronizedArvadosFile(arvadosfile)
615         super(ArvadosFileWriter, self).__init__(self.arvadosfile, name, mode, num_retries=num_retries)
616
617     @ArvadosFileBase._before_close
618     @retry_method
619     def write(self, data, num_retries=None):
620         if self.mode[0] == "a":
621             self.arvadosfile.writeto(self.size(), data)
622         else:
623             self.arvadosfile.writeto(self._filepos, data, num_retries)
624             self._filepos += len(data)
625
626     @ArvadosFileBase._before_close
627     @retry_method
628     def writelines(self, seq, num_retries=None):
629         for s in seq:
630             self.write(s)
631
632     def truncate(self, size=None):
633         if size is None:
634             size = self._filepos
635         self.arvadosfile.truncate(size)
636         if self._filepos > self.size():
637             self._filepos = self.size()