Merge branch '3198-inode-cache' refs #3198
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12
13 from .errors import KeepWriteError, AssertionError
14 from .keep import KeepLocator
15 from ._normalize_stream import normalize_stream
16 from ._ranges import locators_and_ranges, replace_range, Range
17 from .retry import retry_method
18
19 def split(path):
20     """split(path) -> streamname, filename
21
22     Separate the stream name and file name in a /-separated stream path and
23     return a tuple (stream_name, file_name).  If no stream name is available,
24     assume '.'.
25
26     """
27     try:
28         stream_name, file_name = path.rsplit('/', 1)
29     except ValueError:  # No / in string
30         stream_name, file_name = '.', path
31     return stream_name, file_name
32
33 class _FileLikeObjectBase(object):
34     def __init__(self, name, mode):
35         self.name = name
36         self.mode = mode
37         self.closed = False
38
39     @staticmethod
40     def _before_close(orig_func):
41         @functools.wraps(orig_func)
42         def before_close_wrapper(self, *args, **kwargs):
43             if self.closed:
44                 raise ValueError("I/O operation on closed stream file")
45             return orig_func(self, *args, **kwargs)
46         return before_close_wrapper
47
48     def __enter__(self):
49         return self
50
51     def __exit__(self, exc_type, exc_value, traceback):
52         try:
53             self.close()
54         except Exception:
55             if exc_type is None:
56                 raise
57
58     def close(self):
59         self.closed = True
60
61
62 class ArvadosFileReaderBase(_FileLikeObjectBase):
63     def __init__(self, name, mode, num_retries=None):
64         super(ArvadosFileReaderBase, self).__init__(name, mode)
65         self._filepos = 0L
66         self.num_retries = num_retries
67         self._readline_cache = (None, None)
68
69     def __iter__(self):
70         while True:
71             data = self.readline()
72             if not data:
73                 break
74             yield data
75
76     def decompressed_name(self):
77         return re.sub('\.(bz2|gz)$', '', self.name)
78
79     @_FileLikeObjectBase._before_close
80     def seek(self, pos, whence=os.SEEK_SET):
81         if whence == os.SEEK_CUR:
82             pos += self._filepos
83         elif whence == os.SEEK_END:
84             pos += self.size()
85         self._filepos = min(max(pos, 0L), self.size())
86
87     def tell(self):
88         return self._filepos
89
90     @_FileLikeObjectBase._before_close
91     @retry_method
92     def readall(self, size=2**20, num_retries=None):
93         while True:
94             data = self.read(size, num_retries=num_retries)
95             if data == '':
96                 break
97             yield data
98
99     @_FileLikeObjectBase._before_close
100     @retry_method
101     def readline(self, size=float('inf'), num_retries=None):
102         cache_pos, cache_data = self._readline_cache
103         if self.tell() == cache_pos:
104             data = [cache_data]
105         else:
106             data = ['']
107         data_size = len(data[-1])
108         while (data_size < size) and ('\n' not in data[-1]):
109             next_read = self.read(2 ** 20, num_retries=num_retries)
110             if not next_read:
111                 break
112             data.append(next_read)
113             data_size += len(next_read)
114         data = ''.join(data)
115         try:
116             nextline_index = data.index('\n') + 1
117         except ValueError:
118             nextline_index = len(data)
119         nextline_index = min(nextline_index, size)
120         self._readline_cache = (self.tell(), data[nextline_index:])
121         return data[:nextline_index]
122
123     @_FileLikeObjectBase._before_close
124     @retry_method
125     def decompress(self, decompress, size, num_retries=None):
126         for segment in self.readall(size, num_retries):
127             data = decompress(segment)
128             if data:
129                 yield data
130
131     @_FileLikeObjectBase._before_close
132     @retry_method
133     def readall_decompressed(self, size=2**20, num_retries=None):
134         self.seek(0)
135         if self.name.endswith('.bz2'):
136             dc = bz2.BZ2Decompressor()
137             return self.decompress(dc.decompress, size,
138                                    num_retries=num_retries)
139         elif self.name.endswith('.gz'):
140             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
141             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
142                                    size, num_retries=num_retries)
143         else:
144             return self.readall(size, num_retries=num_retries)
145
146     @_FileLikeObjectBase._before_close
147     @retry_method
148     def readlines(self, sizehint=float('inf'), num_retries=None):
149         data = []
150         data_size = 0
151         for s in self.readall(num_retries=num_retries):
152             data.append(s)
153             data_size += len(s)
154             if data_size >= sizehint:
155                 break
156         return ''.join(data).splitlines(True)
157
158     def size(self):
159         raise NotImplementedError()
160
161     def read(self, size, num_retries=None):
162         raise NotImplementedError()
163
164     def readfrom(self, start, size, num_retries=None):
165         raise NotImplementedError()
166
167
168 class StreamFileReader(ArvadosFileReaderBase):
169     class _NameAttribute(str):
170         # The Python file API provides a plain .name attribute.
171         # Older SDK provided a name() method.
172         # This class provides both, for maximum compatibility.
173         def __call__(self):
174             return self
175
176     def __init__(self, stream, segments, name):
177         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
178         self._stream = stream
179         self.segments = segments
180
181     def stream_name(self):
182         return self._stream.name()
183
184     def size(self):
185         n = self.segments[-1]
186         return n.range_start + n.range_size
187
188     @_FileLikeObjectBase._before_close
189     @retry_method
190     def read(self, size, num_retries=None):
191         """Read up to 'size' bytes from the stream, starting at the current file position"""
192         if size == 0:
193             return ''
194
195         data = ''
196         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
197         if available_chunks:
198             lr = available_chunks[0]
199             data = self._stream.readfrom(lr.locator+lr.segment_offset,
200                                           lr.segment_size,
201                                           num_retries=num_retries)
202
203         self._filepos += len(data)
204         return data
205
206     @_FileLikeObjectBase._before_close
207     @retry_method
208     def readfrom(self, start, size, num_retries=None):
209         """Read up to 'size' bytes from the stream, starting at 'start'"""
210         if size == 0:
211             return ''
212
213         data = []
214         for lr in locators_and_ranges(self.segments, start, size):
215             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
216                                               num_retries=num_retries))
217         return ''.join(data)
218
219     def as_manifest(self):
220         segs = []
221         for r in self.segments:
222             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
223         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
224
225
226 def synchronized(orig_func):
227     @functools.wraps(orig_func)
228     def synchronized_wrapper(self, *args, **kwargs):
229         with self.lock:
230             return orig_func(self, *args, **kwargs)
231     return synchronized_wrapper
232
233 class _BufferBlock(object):
234     """A stand-in for a Keep block that is in the process of being written.
235
236     Writers can append to it, get the size, and compute the Keep locator.
237     There are three valid states:
238
239     WRITABLE
240       Can append to block.
241
242     PENDING
243       Block is in the process of being uploaded to Keep, append is an error.
244
245     COMMITTED
246       The block has been written to Keep, its internal buffer has been
247       released, fetching the block will fetch it via keep client (since we
248       discarded the internal copy), and identifiers referring to the BufferBlock
249       can be replaced with the block locator.
250
251     """
252
253     WRITABLE = 0
254     PENDING = 1
255     COMMITTED = 2
256
257     def __init__(self, blockid, starting_capacity, owner):
258         """
259         :blockid:
260           the identifier for this block
261
262         :starting_capacity:
263           the initial buffer capacity
264
265         :owner:
266           ArvadosFile that owns this block
267
268         """
269         self.blockid = blockid
270         self.buffer_block = bytearray(starting_capacity)
271         self.buffer_view = memoryview(self.buffer_block)
272         self.write_pointer = 0
273         self._state = _BufferBlock.WRITABLE
274         self._locator = None
275         self.owner = owner
276         self.lock = threading.Lock()
277
278     @synchronized
279     def append(self, data):
280         """Append some data to the buffer.
281
282         Only valid if the block is in WRITABLE state.  Implements an expanding
283         buffer, doubling capacity as needed to accomdate all the data.
284
285         """
286         if self._state == _BufferBlock.WRITABLE:
287             while (self.write_pointer+len(data)) > len(self.buffer_block):
288                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
289                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
290                 self.buffer_block = new_buffer_block
291                 self.buffer_view = memoryview(self.buffer_block)
292             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
293             self.write_pointer += len(data)
294             self._locator = None
295         else:
296             raise AssertionError("Buffer block is not writable")
297
298     @synchronized
299     def set_state(self, nextstate, loc=None):
300         if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
301             (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
302             self._state = nextstate
303             if self._state == _BufferBlock.COMMITTED:
304                 self._locator = loc
305                 self.buffer_view = None
306                 self.buffer_block = None
307         else:
308             raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate))
309
310     @synchronized
311     def state(self):
312         return self._state
313
314     def size(self):
315         """The amount of data written to the buffer."""
316         return self.write_pointer
317
318     @synchronized
319     def locator(self):
320         """The Keep locator for this buffer's contents."""
321         if self._locator is None:
322             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
323         return self._locator
324
325     @synchronized
326     def clone(self, new_blockid, owner):
327         if self._state == _BufferBlock.COMMITTED:
328             raise AssertionError("Can only duplicate a writable or pending buffer block")
329         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
330         bufferblock.append(self.buffer_view[0:self.size()])
331         return bufferblock
332
333
334 class NoopLock(object):
335     def __enter__(self):
336         return self
337
338     def __exit__(self, exc_type, exc_value, traceback):
339         pass
340
341     def acquire(self, blocking=False):
342         pass
343
344     def release(self):
345         pass
346
347
348 def must_be_writable(orig_func):
349     @functools.wraps(orig_func)
350     def must_be_writable_wrapper(self, *args, **kwargs):
351         if not self.writable():
352             raise IOError((errno.EROFS, "Collection must be writable."))
353         return orig_func(self, *args, **kwargs)
354     return must_be_writable_wrapper
355
356
357 class _BlockManager(object):
358     """BlockManager handles buffer blocks.
359
360     Also handles background block uploads, and background block prefetch for a
361     Collection of ArvadosFiles.
362
363     """
364     def __init__(self, keep):
365         """keep: KeepClient object to use"""
366         self._keep = keep
367         self._bufferblocks = {}
368         self._put_queue = None
369         self._put_errors = None
370         self._put_threads = None
371         self._prefetch_queue = None
372         self._prefetch_threads = None
373         self.lock = threading.Lock()
374         self.prefetch_enabled = True
375         self.num_put_threads = 2
376         self.num_get_threads = 2
377
378     @synchronized
379     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
380         """Allocate a new, empty bufferblock in WRITABLE state and return it.
381
382         :blockid:
383           optional block identifier, otherwise one will be automatically assigned
384
385         :starting_capacity:
386           optional capacity, otherwise will use default capacity
387
388         :owner:
389           ArvadosFile that owns this block
390
391         """
392         if blockid is None:
393             blockid = "bufferblock%i" % len(self._bufferblocks)
394         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
395         self._bufferblocks[bufferblock.blockid] = bufferblock
396         return bufferblock
397
398     @synchronized
399     def dup_block(self, block, owner):
400         """Create a new bufferblock initialized with the content of an existing bufferblock.
401
402         :block:
403           the buffer block to copy.
404
405         :owner:
406           ArvadosFile that owns the new block
407
408         """
409         new_blockid = "bufferblock%i" % len(self._bufferblocks)
410         bufferblock = block.clone(new_blockid, owner)
411         self._bufferblocks[bufferblock.blockid] = bufferblock
412         return bufferblock
413
414     @synchronized
415     def is_bufferblock(self, locator):
416         return locator in self._bufferblocks
417
418     @synchronized
419     def stop_threads(self):
420         """Shut down and wait for background upload and download threads to finish."""
421
422         if self._put_threads is not None:
423             for t in self._put_threads:
424                 self._put_queue.put(None)
425             for t in self._put_threads:
426                 t.join()
427         self._put_threads = None
428         self._put_queue = None
429         self._put_errors = None
430
431         if self._prefetch_threads is not None:
432             for t in self._prefetch_threads:
433                 self._prefetch_queue.put(None)
434             for t in self._prefetch_threads:
435                 t.join()
436         self._prefetch_threads = None
437         self._prefetch_queue = None
438
439     def commit_bufferblock(self, block):
440         """Initiate a background upload of a bufferblock.
441
442         This will block if the upload queue is at capacity, otherwise it will
443         return immediately.
444
445         """
446
447         def commit_bufferblock_worker(self):
448             """Background uploader thread."""
449
450             while True:
451                 try:
452                     bufferblock = self._put_queue.get()
453                     if bufferblock is None:
454                         return
455                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
456                     bufferblock.set_state(_BufferBlock.COMMITTED, loc)
457
458                 except Exception as e:
459                     self._put_errors.put((bufferblock.locator(), e))
460                 finally:
461                     if self._put_queue is not None:
462                         self._put_queue.task_done()
463
464         with self.lock:
465             if self._put_threads is None:
466                 # Start uploader threads.
467
468                 # If we don't limit the Queue size, the upload queue can quickly
469                 # grow to take up gigabytes of RAM if the writing process is
470                 # generating data more quickly than it can be send to the Keep
471                 # servers.
472                 #
473                 # With two upload threads and a queue size of 2, this means up to 4
474                 # blocks pending.  If they are full 64 MiB blocks, that means up to
475                 # 256 MiB of internal buffering, which is the same size as the
476                 # default download block cache in KeepClient.
477                 self._put_queue = Queue.Queue(maxsize=2)
478                 self._put_errors = Queue.Queue()
479
480                 self._put_threads = []
481                 for i in xrange(0, self.num_put_threads):
482                     thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
483                     self._put_threads.append(thread)
484                     thread.daemon = True
485                     thread.start()
486
487         if block.state() == _BufferBlock.WRITABLE:
488             # Mark the block as PENDING so to disallow any more appends.
489             block.set_state(_BufferBlock.PENDING)
490             self._put_queue.put(block)
491
492     @synchronized
493     def get_bufferblock(self, locator):
494         return self._bufferblocks.get(locator)
495
496     def get_block_contents(self, locator, num_retries, cache_only=False):
497         """Fetch a block.
498
499         First checks to see if the locator is a BufferBlock and return that, if
500         not, passes the request through to KeepClient.get().
501
502         """
503         with self.lock:
504             if locator in self._bufferblocks:
505                 bufferblock = self._bufferblocks[locator]
506                 if bufferblock.state() != _BufferBlock.COMMITTED:
507                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
508                 else:
509                     locator = bufferblock._locator
510         if cache_only:
511             return self._keep.get_from_cache(locator)
512         else:
513             return self._keep.get(locator, num_retries=num_retries)
514
515     def commit_all(self):
516         """Commit all outstanding buffer blocks.
517
518         Unlike commit_bufferblock(), this is a synchronous call, and will not
519         return until all buffer blocks are uploaded.  Raises
520         KeepWriteError() if any blocks failed to upload.
521
522         """
523         with self.lock:
524             items = self._bufferblocks.items()
525
526         for k,v in items:
527             v.owner.flush()
528
529         with self.lock:
530             if self._put_queue is not None:
531                 self._put_queue.join()
532
533                 if not self._put_errors.empty():
534                     err = []
535                     try:
536                         while True:
537                             err.append(self._put_errors.get(False))
538                     except Queue.Empty:
539                         pass
540                     raise KeepWriteError("Error writing some blocks", err, label="block")
541
542     def block_prefetch(self, locator):
543         """Initiate a background download of a block.
544
545         This assumes that the underlying KeepClient implements a block cache,
546         so repeated requests for the same block will not result in repeated
547         downloads (unless the block is evicted from the cache.)  This method
548         does not block.
549
550         """
551
552         if not self.prefetch_enabled:
553             return
554
555         def block_prefetch_worker(self):
556             """The background downloader thread."""
557             while True:
558                 try:
559                     b = self._prefetch_queue.get()
560                     if b is None:
561                         return
562                     self._keep.get(b)
563                 except Exception:
564                     pass
565
566         with self.lock:
567             if locator in self._bufferblocks:
568                 return
569             if self._prefetch_threads is None:
570                 self._prefetch_queue = Queue.Queue()
571                 self._prefetch_threads = []
572                 for i in xrange(0, self.num_get_threads):
573                     thread = threading.Thread(target=block_prefetch_worker, args=(self,))
574                     self._prefetch_threads.append(thread)
575                     thread.daemon = True
576                     thread.start()
577         self._prefetch_queue.put(locator)
578
579
580 class ArvadosFile(object):
581     """Represent a file in a Collection.
582
583     ArvadosFile manages the underlying representation of a file in Keep as a
584     sequence of segments spanning a set of blocks, and implements random
585     read/write access.
586
587     This object may be accessed from multiple threads.
588
589     """
590
591     def __init__(self, parent, stream=[], segments=[]):
592         """
593         ArvadosFile constructor.
594
595         :stream:
596           a list of Range objects representing a block stream
597
598         :segments:
599           a list of Range objects representing segments
600         """
601         self.parent = parent
602         self._modified = True
603         self._segments = []
604         self.lock = parent.root_collection().lock
605         for s in segments:
606             self._add_segment(stream, s.locator, s.range_size)
607         self._current_bblock = None
608
609     def writable(self):
610         return self.parent.writable()
611
612     @synchronized
613     def segments(self):
614         return copy.copy(self._segments)
615
616     @synchronized
617     def clone(self, new_parent):
618         """Make a copy of this file."""
619         cp = ArvadosFile(new_parent)
620         cp.replace_contents(self)
621         return cp
622
623     @must_be_writable
624     @synchronized
625     def replace_contents(self, other):
626         """Replace segments of this file with segments from another `ArvadosFile` object."""
627
628         map_loc = {}
629         self._segments = []
630         for other_segment in other.segments():
631             new_loc = other_segment.locator
632             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
633                 if other_segment.locator not in map_loc:
634                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
635                     if bufferblock.state() != _BufferBlock.WRITABLE:
636                         map_loc[other_segment.locator] = bufferblock.locator()
637                     else:
638                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
639                 new_loc = map_loc[other_segment.locator]
640
641             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
642
643         self._modified = True
644
645     def __eq__(self, other):
646         if other is self:
647             return True
648         if not isinstance(other, ArvadosFile):
649             return False
650
651         othersegs = other.segments()
652         with self.lock:
653             if len(self._segments) != len(othersegs):
654                 return False
655             for i in xrange(0, len(othersegs)):
656                 seg1 = self._segments[i]
657                 seg2 = othersegs[i]
658                 loc1 = seg1.locator
659                 loc2 = seg2.locator
660
661                 if self.parent._my_block_manager().is_bufferblock(loc1):
662                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
663
664                 if other.parent._my_block_manager().is_bufferblock(loc2):
665                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
666
667                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
668                     seg1.range_start != seg2.range_start or
669                     seg1.range_size != seg2.range_size or
670                     seg1.segment_offset != seg2.segment_offset):
671                     return False
672
673         return True
674
675     def __ne__(self, other):
676         return not self.__eq__(other)
677
678     @synchronized
679     def set_unmodified(self):
680         """Clear the modified flag"""
681         self._modified = False
682
683     @synchronized
684     def modified(self):
685         """Test the modified flag"""
686         return self._modified
687
688     @must_be_writable
689     @synchronized
690     def truncate(self, size):
691         """Shrink the size of the file.
692
693         If `size` is less than the size of the file, the file contents after
694         `size` will be discarded.  If `size` is greater than the current size
695         of the file, an IOError will be raised.
696
697         """
698         if size < self.size():
699             new_segs = []
700             for r in self._segments:
701                 range_end = r.range_start+r.range_size
702                 if r.range_start >= size:
703                     # segment is past the trucate size, all done
704                     break
705                 elif size < range_end:
706                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
707                     nr.segment_offset = r.segment_offset
708                     new_segs.append(nr)
709                     break
710                 else:
711                     new_segs.append(r)
712
713             self._segments = new_segs
714             self._modified = True
715         elif size > self.size():
716             raise IOError("truncate() does not support extending the file size")
717
718
719     def readfrom(self, offset, size, num_retries, exact=False):
720         """Read up to `size` bytes from the file starting at `offset`.
721
722         :exact:
723          If False (default), return less data than requested if the read
724          crosses a block boundary and the next block isn't cached.  If True,
725          only return less data than requested when hitting EOF.
726         """
727
728         with self.lock:
729             if size == 0 or offset >= self.size():
730                 return ''
731             prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
732             readsegs = locators_and_ranges(self._segments, offset, size)
733
734         for lr in prefetch:
735             self.parent._my_block_manager().block_prefetch(lr.locator)
736
737         data = []
738         for lr in readsegs:
739             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
740             if block:
741                 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
742             else:
743                 break
744         return ''.join(data)
745
746     def _repack_writes(self, num_retries):
747         """Test if the buffer block has more data than actual segments.
748
749         This happens when a buffered write over-writes a file range written in
750         a previous buffered write.  Re-pack the buffer block for efficiency
751         and to avoid leaking information.
752
753         """
754         segs = self._segments
755
756         # Sum up the segments to get the total bytes of the file referencing
757         # into the buffer block.
758         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
759         write_total = sum([s.range_size for s in bufferblock_segs])
760
761         if write_total < self._current_bblock.size():
762             # There is more data in the buffer block than is actually accounted for by segments, so
763             # re-pack into a new buffer by copying over to a new buffer block.
764             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
765             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
766             for t in bufferblock_segs:
767                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
768                 t.segment_offset = new_bb.size() - t.range_size
769
770             self._current_bblock = new_bb
771
772     @must_be_writable
773     @synchronized
774     def writeto(self, offset, data, num_retries):
775         """Write `data` to the file starting at `offset`.
776
777         This will update existing bytes and/or extend the size of the file as
778         necessary.
779
780         """
781         if len(data) == 0:
782             return
783
784         if offset > self.size():
785             raise ArgumentError("Offset is past the end of the file")
786
787         if len(data) > config.KEEP_BLOCK_SIZE:
788             raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
789
790         self._modified = True
791
792         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
793             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
794
795         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
796             self._repack_writes(num_retries)
797             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
798                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
799                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
800
801         self._current_bblock.append(data)
802
803         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
804
805     @synchronized
806     def flush(self, num_retries=0):
807         if self._current_bblock:
808             self._repack_writes(num_retries)
809             self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
810
811     @must_be_writable
812     @synchronized
813     def add_segment(self, blocks, pos, size):
814         """Add a segment to the end of the file.
815
816         `pos` and `offset` reference a section of the stream described by
817         `blocks` (a list of Range objects)
818
819         """
820         self._add_segment(blocks, pos, size)
821
822     def _add_segment(self, blocks, pos, size):
823         """Internal implementation of add_segment."""
824         self._modified = True
825         for lr in locators_and_ranges(blocks, pos, size):
826             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
827             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
828             self._segments.append(r)
829
830     @synchronized
831     def size(self):
832         """Get the file size."""
833         if self._segments:
834             n = self._segments[-1]
835             return n.range_start + n.range_size
836         else:
837             return 0
838
839     @synchronized
840     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
841         buf = ""
842         filestream = []
843         for segment in self.segments:
844             loc = segment.locator
845             if loc.startswith("bufferblock"):
846                 loc = self._bufferblocks[loc].calculate_locator()
847             if portable_locators:
848                 loc = KeepLocator(loc).stripped()
849             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
850                                  segment.segment_offset, segment.range_size))
851         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
852         buf += "\n"
853         return buf
854
855
856 class ArvadosFileReader(ArvadosFileReaderBase):
857     """Wraps ArvadosFile in a file-like object supporting reading only.
858
859     Be aware that this class is NOT thread safe as there is no locking around
860     updating file pointer.
861
862     """
863
864     def __init__(self, arvadosfile, name, mode="r", num_retries=None):
865         super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
866         self.arvadosfile = arvadosfile
867
868     def size(self):
869         return self.arvadosfile.size()
870
871     def stream_name(self):
872         return self.arvadosfile.parent.stream_name()
873
874     @_FileLikeObjectBase._before_close
875     @retry_method
876     def read(self, size, num_retries=None):
877         """Read up to `size` bytes from the stream, starting at the current file position."""
878         data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
879         self._filepos += len(data)
880         return data
881
882     @_FileLikeObjectBase._before_close
883     @retry_method
884     def readfrom(self, offset, size, num_retries=None):
885         """Read up to `size` bytes from the stream, starting at the current file position."""
886         return self.arvadosfile.readfrom(offset, size, num_retries)
887
888     def flush(self):
889         pass
890
891
892 class ArvadosFileWriter(ArvadosFileReader):
893     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
894
895     Be aware that this class is NOT thread safe as there is no locking around
896     updating file pointer.
897
898     """
899
900     def __init__(self, arvadosfile, name, mode, num_retries=None):
901         super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
902
903     @_FileLikeObjectBase._before_close
904     @retry_method
905     def write(self, data, num_retries=None):
906         if self.mode[0] == "a":
907             self.arvadosfile.writeto(self.size(), data, num_retries)
908         else:
909             self.arvadosfile.writeto(self._filepos, data, num_retries)
910             self._filepos += len(data)
911
912     @_FileLikeObjectBase._before_close
913     @retry_method
914     def writelines(self, seq, num_retries=None):
915         for s in seq:
916             self.write(s, num_retries)
917
918     @_FileLikeObjectBase._before_close
919     def truncate(self, size=None):
920         if size is None:
921             size = self._filepos
922         self.arvadosfile.truncate(size)
923         if self._filepos > self.size():
924             self._filepos = self.size()
925
926     @_FileLikeObjectBase._before_close
927     def flush(self):
928         self.arvadosfile.flush()
929
930     def close(self):
931         if not self.closed:
932             self.flush()
933             super(ArvadosFileWriter, self).close()