4823: More docstrings for public Collection and ArvFile API
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 from .ranges import *
6 from arvados.retry import retry_method
7 import config
8 import hashlib
9 import hashlib
10 import threading
11 import Queue
12
13 def split(path):
14     """split(path) -> streamname, filename
15
16     Separate the stream name and file name in a /-separated stream path.
17     If no stream name is available, assume '.'.
18     """
19     try:
20         stream_name, file_name = path.rsplit('/', 1)
21     except ValueError:  # No / in string
22         stream_name, file_name = '.', path
23     return stream_name, file_name
24
25 class ArvadosFileBase(object):
26     def __init__(self, name, mode):
27         self.name = name
28         self.mode = mode
29         self.closed = False
30
31     @staticmethod
32     def _before_close(orig_func):
33         @functools.wraps(orig_func)
34         def wrapper(self, *args, **kwargs):
35             if self.closed:
36                 raise ValueError("I/O operation on closed stream file")
37             return orig_func(self, *args, **kwargs)
38         return wrapper
39
40     def __enter__(self):
41         return self
42
43     def __exit__(self, exc_type, exc_value, traceback):
44         try:
45             self.close()
46         except Exception:
47             if exc_type is None:
48                 raise
49
50     def close(self):
51         self.closed = True
52
53
54 class ArvadosFileReaderBase(ArvadosFileBase):
55     class _NameAttribute(str):
56         # The Python file API provides a plain .name attribute.
57         # Older SDK provided a name() method.
58         # This class provides both, for maximum compatibility.
59         def __call__(self):
60             return self
61
62     def __init__(self, name, mode, num_retries=None):
63         super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode)
64         self._filepos = 0L
65         self.num_retries = num_retries
66         self._readline_cache = (None, None)
67
68     def __iter__(self):
69         while True:
70             data = self.readline()
71             if not data:
72                 break
73             yield data
74
75     def decompressed_name(self):
76         return re.sub('\.(bz2|gz)$', '', self.name)
77
78     @ArvadosFileBase._before_close
79     def seek(self, pos, whence=os.SEEK_CUR):
80         if whence == os.SEEK_CUR:
81             pos += self._filepos
82         elif whence == os.SEEK_END:
83             pos += self.size()
84         self._filepos = min(max(pos, 0L), self.size())
85
86     def tell(self):
87         return self._filepos
88
89     @ArvadosFileBase._before_close
90     @retry_method
91     def readall(self, size=2**20, num_retries=None):
92         while True:
93             data = self.read(size, num_retries=num_retries)
94             if data == '':
95                 break
96             yield data
97
98     @ArvadosFileBase._before_close
99     @retry_method
100     def readline(self, size=float('inf'), num_retries=None):
101         cache_pos, cache_data = self._readline_cache
102         if self.tell() == cache_pos:
103             data = [cache_data]
104         else:
105             data = ['']
106         data_size = len(data[-1])
107         while (data_size < size) and ('\n' not in data[-1]):
108             next_read = self.read(2 ** 20, num_retries=num_retries)
109             if not next_read:
110                 break
111             data.append(next_read)
112             data_size += len(next_read)
113         data = ''.join(data)
114         try:
115             nextline_index = data.index('\n') + 1
116         except ValueError:
117             nextline_index = len(data)
118         nextline_index = min(nextline_index, size)
119         self._readline_cache = (self.tell(), data[nextline_index:])
120         return data[:nextline_index]
121
122     @ArvadosFileBase._before_close
123     @retry_method
124     def decompress(self, decompress, size, num_retries=None):
125         for segment in self.readall(size, num_retries):
126             data = decompress(segment)
127             if data:
128                 yield data
129
130     @ArvadosFileBase._before_close
131     @retry_method
132     def readall_decompressed(self, size=2**20, num_retries=None):
133         self.seek(0)
134         if self.name.endswith('.bz2'):
135             dc = bz2.BZ2Decompressor()
136             return self.decompress(dc.decompress, size,
137                                    num_retries=num_retries)
138         elif self.name.endswith('.gz'):
139             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
140             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
141                                    size, num_retries=num_retries)
142         else:
143             return self.readall(size, num_retries=num_retries)
144
145     @ArvadosFileBase._before_close
146     @retry_method
147     def readlines(self, sizehint=float('inf'), num_retries=None):
148         data = []
149         data_size = 0
150         for s in self.readall(num_retries=num_retries):
151             data.append(s)
152             data_size += len(s)
153             if data_size >= sizehint:
154                 break
155         return ''.join(data).splitlines(True)
156
157     def size(self):
158         raise NotImplementedError()
159
160     def read(self, size, num_retries=None):
161         raise NotImplementedError()
162
163     def readfrom(self, start, size, num_retries=None):
164         raise NotImplementedError()
165
166
167 class StreamFileReader(ArvadosFileReaderBase):
168     def __init__(self, stream, segments, name):
169         super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries)
170         self._stream = stream
171         self.segments = segments
172
173     def stream_name(self):
174         return self._stream.name()
175
176     def size(self):
177         n = self.segments[-1]
178         return n.range_start + n.range_size
179
180     @ArvadosFileBase._before_close
181     @retry_method
182     def read(self, size, num_retries=None):
183         """Read up to 'size' bytes from the stream, starting at the current file position"""
184         if size == 0:
185             return ''
186
187         data = ''
188         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
189         if available_chunks:
190             lr = available_chunks[0]
191             data = self._stream._readfrom(lr.locator+lr.segment_offset,
192                                           lr.segment_size,
193                                           num_retries=num_retries)
194
195         self._filepos += len(data)
196         return data
197
198     @ArvadosFileBase._before_close
199     @retry_method
200     def readfrom(self, start, size, num_retries=None):
201         """Read up to 'size' bytes from the stream, starting at 'start'"""
202         if size == 0:
203             return ''
204
205         data = []
206         for lr in locators_and_ranges(self.segments, start, size):
207             data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size,
208                                               num_retries=num_retries))
209         return ''.join(data)
210
211     def as_manifest(self):
212         from stream import normalize_stream
213         segs = []
214         for r in self.segments:
215             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
216         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
217
218
219 class BufferBlock(object):
220 '''
221 A BufferBlock is a stand-in for a Keep block that is in the process of being
222 written.  Writers can append to it, get the size, and compute the Keep locator.
223
224 There are three valid states:
225
226 WRITABLE - can append
227
228 PENDING - is in the process of being uploaded to Keep, append is an error
229
230 COMMITTED - the block has been written to Keep, its internal buffer has been
231 released, and the BufferBlock should be discarded in favor of fetching the
232 block through normal Keep means.
233 '''
234     WRITABLE = 0
235     PENDING = 1
236     COMMITTED = 2
237
238     def __init__(self, blockid, starting_capacity):
239         '''
240         blockid: the identifier for this block
241         starting_capacity: the initial buffer capacity
242         '''
243         self.blockid = blockid
244         self.buffer_block = bytearray(starting_capacity)
245         self.buffer_view = memoryview(self.buffer_block)
246         self.write_pointer = 0
247         self.state = BufferBlock.WRITABLE
248         self._locator = None
249
250     def append(self, data):
251         '''
252         Append some data to the buffer.  Only valid if the block is in WRITABLE
253         state.  Implements an expanding buffer, doubling capacity as needed to
254         accomdate all the data.
255         '''
256         if self.state == BufferBlock.WRITABLE:
257             while (self.write_pointer+len(data)) > len(self.buffer_block):
258                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
259                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
260                 self.buffer_block = new_buffer_block
261                 self.buffer_view = memoryview(self.buffer_block)
262             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
263             self.write_pointer += len(data)
264             self._locator = None
265         else:
266             raise AssertionError("Buffer block is not writable")
267
268     def size(self):
269         '''Amount of data written to the buffer'''
270         return self.write_pointer
271
272     def locator(self):
273         '''The Keep locator for this buffer's contents.'''
274         if self._locator is None:
275             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
276         return self._locator
277
278
279 class AsyncKeepWriteErrors(Exception):
280     '''
281     Roll up one or more Keep write exceptions (generated by background
282     threads) into a single one.
283     '''
284     def __init__(self, errors):
285         self.errors = errors
286
287     def __repr__(self):
288         return "\n".join(self.errors)
289
290 class BlockManager(object):
291     '''
292     BlockManager handles buffer blocks, background block uploads, and
293     background block prefetch for a Collection of ArvadosFiles.
294     '''
295     def __init__(self, keep):
296         '''keep: KeepClient object to use'''
297         self._keep = keep
298         self._bufferblocks = {}
299         self._put_queue = None
300         self._put_errors = None
301         self._put_threads = None
302         self._prefetch_queue = None
303         self._prefetch_threads = None
304
305     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14):
306         '''
307         Allocate a new, empty bufferblock in WRITABLE state and return it.
308         blockid: optional block identifier, otherwise one will be automatically assigned
309         starting_capacity: optional capacity, otherwise will use default capacity
310         '''
311         if blockid is None:
312             blockid = "bufferblock%i" % len(self._bufferblocks)
313         bb = BufferBlock(blockid, starting_capacity=starting_capacity)
314         self._bufferblocks[bb.blockid] = bb
315         return bb
316
317     def stop_threads(self):
318         '''
319         Shut down and wait for background upload and download threads to finish.
320         '''
321         if self._put_threads is not None:
322             for t in self._put_threads:
323                 self._put_queue.put(None)
324             for t in self._put_threads:
325                 t.join()
326         self._put_threads = None
327         self._put_queue = None
328         self._put_errors = None
329
330         if self._prefetch_threads is not None:
331             for t in self._prefetch_threads:
332                 self._prefetch_queue.put(None)
333             for t in self._prefetch_threads:
334                 t.join()
335         self._prefetch_threads = None
336         self._prefetch_queue = None
337
338     def commit_bufferblock(self, block):
339         '''
340         Initiate a background upload of a bufferblock.  This will block if the
341         upload queue is at capacity, otherwise it will return immediately.
342         '''
343
344         def worker(self):
345             '''
346             Background uploader thread.
347             '''
348             while True:
349                 try:
350                     b = self._put_queue.get()
351                     if b is None:
352                         return
353                     b._locator = self._keep.put(b.buffer_view[0:b.write_pointer].tobytes())
354                     b.state = BufferBlock.COMMITTED
355                     b.buffer_view = None
356                     b.buffer_block = None
357                 except Exception as e:
358                     print e
359                     self._put_errors.put(e)
360                 finally:
361                     if self._put_queue is not None:
362                         self._put_queue.task_done()
363
364         if self._put_threads is None:
365             # Start uploader threads.
366
367             # If we don't limit the Queue size, the upload queue can quickly
368             # grow to take up gigabytes of RAM if the writing process is
369             # generating data more quickly than it can be send to the Keep
370             # servers.
371             #
372             # With two upload threads and a queue size of 2, this means up to 4
373             # blocks pending.  If they are full 64 MiB blocks, that means up to
374             # 256 MiB of internal buffering, which is the same size as the
375             # default download block cache in KeepClient.
376             self._put_queue = Queue.Queue(maxsize=2)
377             self._put_errors = Queue.Queue()
378             self._put_threads = [threading.Thread(target=worker, args=(self,)),
379                                  threading.Thread(target=worker, args=(self,))]
380             for t in self._put_threads:
381                 t.daemon = True
382                 t.start()
383
384         # Mark the block as PENDING so to disallow any more appends.
385         block.state = BufferBlock.PENDING
386         self._put_queue.put(block)
387
388     def get_block(self, locator, num_retries, cache_only=False):
389         '''
390         Fetch a block.  First checks to see if the locator is a BufferBlock and
391         return that, if not, passes the request through to KeepClient.get().
392         '''
393         if locator in self._bufferblocks:
394             bb = self._bufferblocks[locator]
395             if bb.state != BufferBlock.COMMITTED:
396                 return bb.buffer_view[0:bb.write_pointer].tobytes()
397             else:
398                 locator = bb._locator
399         return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only)
400
401     def commit_all(self):
402         '''
403         Commit all outstanding buffer blocks.  Unlike commit_bufferblock(), this
404         is a synchronous call, and will not return until all buffer blocks are
405         uploaded.  Raises AsyncKeepWriteErrors() if any blocks failed to
406         upload.
407         '''
408         for k,v in self._bufferblocks.items():
409             if v.state == BufferBlock.WRITABLE:
410                 self.commit_bufferblock(v)
411         if self._put_queue is not None:
412             self._put_queue.join()
413             if not self._put_errors.empty():
414                 e = []
415                 try:
416                     while True:
417                         e.append(self._put_errors.get(False))
418                 except Queue.Empty:
419                     pass
420                 raise AsyncKeepWriteErrors(e)
421
422     def block_prefetch(self, locator):
423         '''
424         Initiate a background download of a block.  This assumes that the
425         underlying KeepClient implements a block cache, so repeated requests
426         for the same block will not result in repeated downloads (unless the
427         block is evicted from the cache.)  This method does not block.
428         '''
429         def worker(self):
430             '''Background downloader thread.'''
431             while True:
432                 try:
433                     b = self._prefetch_queue.get()
434                     if b is None:
435                         return
436                     self._keep.get(b)
437                 except:
438                     pass
439
440         if locator in self._bufferblocks:
441             return
442         if self._prefetch_threads is None:
443             self._prefetch_queue = Queue.Queue()
444             self._prefetch_threads = [threading.Thread(target=worker, args=(self,)),
445                                       threading.Thread(target=worker, args=(self,))]
446             for t in self._prefetch_threads:
447                 t.daemon = True
448                 t.start()
449         self._prefetch_queue.put(locator)
450
451
452 class ArvadosFile(object):
453     '''
454     ArvadosFile manages the underlying representation of a file in Keep as a sequence of
455     segments spanning a set of blocks, and implements random read/write access.
456     '''
457
458     def __init__(self, parent, stream=[], segments=[]):
459         '''
460         stream: a list of Range objects representing a block stream
461         segments: a list of Range objects representing segments
462         '''
463         self.parent = parent
464         self._modified = True
465         self.segments = []
466         for s in segments:
467             self.add_segment(stream, s.locator, s.range_size)
468         self._current_bblock = None
469         self.lock = threading.Lock()
470
471     def clone(self):
472         '''Make a copy of this file.'''
473         # TODO: copy bufferblocks?
474         with self.lock:
475             cp = ArvadosFile()
476             cp.parent = self.parent
477             cp._modified = False
478             cp.segments = [Range(r.locator, r.range_start, r.range_size, r.segment_offset) for r in self.segments]
479             return cp
480
481     def set_unmodified(self):
482         '''Clear the modified flag'''
483         self._modified = False
484
485     def modified(self):
486         '''Test the modified flag'''
487         return self._modified
488
489     def truncate(self, size):
490         '''
491         Adjust the size of the file.  If "size" is less than the size of the file,
492         the file contents after "size" will be discarded.  If "size" is greater
493         than the current size of the file, an IOError will be raised.
494         '''
495         if size < self.size():
496             new_segs = []
497             for r in self.segments:
498                 range_end = r.range_start+r.range_size
499                 if r.range_start >= size:
500                     # segment is past the trucate size, all done
501                     break
502                 elif size < range_end:
503                     nr = Range(r.locator, r.range_start, size - r.range_start)
504                     nr.segment_offset = r.segment_offset
505                     new_segs.append(nr)
506                     break
507                 else:
508                     new_segs.append(r)
509
510             self.segments = new_segs
511             self._modified = True
512         elif size > self.size():
513             raise IOError("truncate() does not support extending the file size")
514
515
516     def readfrom(self, offset, size, num_retries):
517         '''
518         read upto "size" bytes from the file starting at "offset".
519         '''
520         if size == 0 or offset >= self.size():
521             return ''
522         data = []
523
524         for lr in locators_and_ranges(self.segments, offset, size + config.KEEP_BLOCK_SIZE):
525             self.parent._my_block_manager().block_prefetch(lr.locator)
526
527         for lr in locators_and_ranges(self.segments, offset, size):
528             d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
529             if d:
530                 data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size])
531             else:
532                 break
533         return ''.join(data)
534
535     def _repack_writes(self):
536         '''
537         Test if the buffer block has more data than is referenced by actual segments
538         (this happens when a buffered write over-writes a file range written in
539         a previous buffered write).  Re-pack the buffer block for efficiency
540         and to avoid leaking information.
541         '''
542         segs = self.segments
543
544         # Sum up the segments to get the total bytes of the file referencing
545         # into the buffer block.
546         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
547         write_total = sum([s.range_size for s in bufferblock_segs])
548
549         if write_total < self._current_bblock.size():
550             # There is more data in the buffer block than is actually accounted for by segments, so
551             # re-pack into a new buffer by copying over to a new buffer block.
552             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_size=write_total)
553             for t in bufferblock_segs:
554                 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
555                 t.segment_offset = new_bb.size() - t.range_size
556
557             self._current_bblock = new_bb
558
559     def writeto(self, offset, data, num_retries):
560         '''
561         Write "data" to the file starting at "offset".  This will update
562         existing bytes and/or extend the size of the file as necessary.
563         '''
564         if len(data) == 0:
565             return
566
567         if offset > self.size():
568             raise ArgumentError("Offset is past the end of the file")
569
570         if len(data) > config.KEEP_BLOCK_SIZE:
571             raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
572
573         self._modified = True
574
575         if self._current_bblock is None or self._current_bblock.state != BufferBlock.WRITABLE:
576             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock()
577
578         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
579             self._repack_writes()
580             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
581                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
582                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock()
583
584         self._current_bblock.append(data)
585         replace_range(self.segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
586
587     def add_segment(self, blocks, pos, size):
588         '''
589         Add a segment to the end of the file, with "pos" and "offset" referencing a
590         section of the stream described by "blocks" (a list of Range objects)
591         '''
592         self._modified = True
593         for lr in locators_and_ranges(blocks, pos, size):
594             last = self.segments[-1] if self.segments else Range(0, 0, 0)
595             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
596             self.segments.append(r)
597
598     def size(self):
599         '''Get the file size'''
600         if self.segments:
601             n = self.segments[-1]
602             return n.range_start + n.range_size
603         else:
604             return 0
605
606
607 class ArvadosFileReader(ArvadosFileReaderBase):
608     def __init__(self, arvadosfile, name, mode="r", num_retries=None):
609         super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
610         self.arvadosfile = arvadosfile.clone()
611
612     def size(self):
613         return self.arvadosfile.size()
614
615     @ArvadosFileBase._before_close
616     @retry_method
617     def read(self, size, num_retries=None):
618         """Read up to 'size' bytes from the stream, starting at the current file position"""
619         data = self.arvadosfile.readfrom(self._filepos, size, num_retries=num_retries)
620         self._filepos += len(data)
621         return data
622
623     @ArvadosFileBase._before_close
624     @retry_method
625     def readfrom(self, offset, size, num_retries=None):
626         """Read up to 'size' bytes from the stream, starting at the current file position"""
627         return self.arvadosfile.readfrom(offset, size, num_retries)
628
629     def flush(self):
630         pass
631
632
633 class SynchronizedArvadosFile(object):
634     def __init__(self, arvadosfile):
635         self.arvadosfile = arvadosfile
636
637     def clone(self):
638         return self
639
640     def __getattr__(self, name):
641         with self.arvadosfile.lock:
642             return getattr(self.arvadosfile, name)
643
644
645 class ArvadosFileWriter(ArvadosFileReader):
646     def __init__(self, arvadosfile, name, mode, num_retries=None):
647         self.arvadosfile = SynchronizedArvadosFile(arvadosfile)
648         super(ArvadosFileWriter, self).__init__(self.arvadosfile, name, mode, num_retries=num_retries)
649
650     @ArvadosFileBase._before_close
651     @retry_method
652     def write(self, data, num_retries=None):
653         if self.mode[0] == "a":
654             self.arvadosfile.writeto(self.size(), data)
655         else:
656             self.arvadosfile.writeto(self._filepos, data, num_retries)
657             self._filepos += len(data)
658
659     @ArvadosFileBase._before_close
660     @retry_method
661     def writelines(self, seq, num_retries=None):
662         for s in seq:
663             self.write(s)
664
665     def truncate(self, size=None):
666         if size is None:
667             size = self._filepos
668         self.arvadosfile.truncate(size)
669         if self._filepos > self.size():
670             self._filepos = self.size()