6194: Fix typo in invocation of writeto() and use memoryview to avoid copying slices.
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12
13 from .errors import KeepWriteError, AssertionError, ArgumentError
14 from .keep import KeepLocator
15 from ._normalize_stream import normalize_stream
16 from ._ranges import locators_and_ranges, replace_range, Range
17 from .retry import retry_method
18
19 def split(path):
20     """split(path) -> streamname, filename
21
22     Separate the stream name and file name in a /-separated stream path and
23     return a tuple (stream_name, file_name).  If no stream name is available,
24     assume '.'.
25
26     """
27     try:
28         stream_name, file_name = path.rsplit('/', 1)
29     except ValueError:  # No / in string
30         stream_name, file_name = '.', path
31     return stream_name, file_name
32
33 class _FileLikeObjectBase(object):
34     def __init__(self, name, mode):
35         self.name = name
36         self.mode = mode
37         self.closed = False
38
39     @staticmethod
40     def _before_close(orig_func):
41         @functools.wraps(orig_func)
42         def before_close_wrapper(self, *args, **kwargs):
43             if self.closed:
44                 raise ValueError("I/O operation on closed stream file")
45             return orig_func(self, *args, **kwargs)
46         return before_close_wrapper
47
48     def __enter__(self):
49         return self
50
51     def __exit__(self, exc_type, exc_value, traceback):
52         try:
53             self.close()
54         except Exception:
55             if exc_type is None:
56                 raise
57
58     def close(self):
59         self.closed = True
60
61
62 class ArvadosFileReaderBase(_FileLikeObjectBase):
63     def __init__(self, name, mode, num_retries=None):
64         super(ArvadosFileReaderBase, self).__init__(name, mode)
65         self._filepos = 0L
66         self.num_retries = num_retries
67         self._readline_cache = (None, None)
68
69     def __iter__(self):
70         while True:
71             data = self.readline()
72             if not data:
73                 break
74             yield data
75
76     def decompressed_name(self):
77         return re.sub('\.(bz2|gz)$', '', self.name)
78
79     @_FileLikeObjectBase._before_close
80     def seek(self, pos, whence=os.SEEK_SET):
81         if whence == os.SEEK_CUR:
82             pos += self._filepos
83         elif whence == os.SEEK_END:
84             pos += self.size()
85         self._filepos = min(max(pos, 0L), self.size())
86
87     def tell(self):
88         return self._filepos
89
90     @_FileLikeObjectBase._before_close
91     @retry_method
92     def readall(self, size=2**20, num_retries=None):
93         while True:
94             data = self.read(size, num_retries=num_retries)
95             if data == '':
96                 break
97             yield data
98
99     @_FileLikeObjectBase._before_close
100     @retry_method
101     def readline(self, size=float('inf'), num_retries=None):
102         cache_pos, cache_data = self._readline_cache
103         if self.tell() == cache_pos:
104             data = [cache_data]
105         else:
106             data = ['']
107         data_size = len(data[-1])
108         while (data_size < size) and ('\n' not in data[-1]):
109             next_read = self.read(2 ** 20, num_retries=num_retries)
110             if not next_read:
111                 break
112             data.append(next_read)
113             data_size += len(next_read)
114         data = ''.join(data)
115         try:
116             nextline_index = data.index('\n') + 1
117         except ValueError:
118             nextline_index = len(data)
119         nextline_index = min(nextline_index, size)
120         self._readline_cache = (self.tell(), data[nextline_index:])
121         return data[:nextline_index]
122
123     @_FileLikeObjectBase._before_close
124     @retry_method
125     def decompress(self, decompress, size, num_retries=None):
126         for segment in self.readall(size, num_retries):
127             data = decompress(segment)
128             if data:
129                 yield data
130
131     @_FileLikeObjectBase._before_close
132     @retry_method
133     def readall_decompressed(self, size=2**20, num_retries=None):
134         self.seek(0)
135         if self.name.endswith('.bz2'):
136             dc = bz2.BZ2Decompressor()
137             return self.decompress(dc.decompress, size,
138                                    num_retries=num_retries)
139         elif self.name.endswith('.gz'):
140             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
141             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
142                                    size, num_retries=num_retries)
143         else:
144             return self.readall(size, num_retries=num_retries)
145
146     @_FileLikeObjectBase._before_close
147     @retry_method
148     def readlines(self, sizehint=float('inf'), num_retries=None):
149         data = []
150         data_size = 0
151         for s in self.readall(num_retries=num_retries):
152             data.append(s)
153             data_size += len(s)
154             if data_size >= sizehint:
155                 break
156         return ''.join(data).splitlines(True)
157
158     def size(self):
159         raise NotImplementedError()
160
161     def read(self, size, num_retries=None):
162         raise NotImplementedError()
163
164     def readfrom(self, start, size, num_retries=None):
165         raise NotImplementedError()
166
167
168 class StreamFileReader(ArvadosFileReaderBase):
169     class _NameAttribute(str):
170         # The Python file API provides a plain .name attribute.
171         # Older SDK provided a name() method.
172         # This class provides both, for maximum compatibility.
173         def __call__(self):
174             return self
175
176     def __init__(self, stream, segments, name):
177         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
178         self._stream = stream
179         self.segments = segments
180
181     def stream_name(self):
182         return self._stream.name()
183
184     def size(self):
185         n = self.segments[-1]
186         return n.range_start + n.range_size
187
188     @_FileLikeObjectBase._before_close
189     @retry_method
190     def read(self, size, num_retries=None):
191         """Read up to 'size' bytes from the stream, starting at the current file position"""
192         if size == 0:
193             return ''
194
195         data = ''
196         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
197         if available_chunks:
198             lr = available_chunks[0]
199             data = self._stream.readfrom(lr.locator+lr.segment_offset,
200                                           lr.segment_size,
201                                           num_retries=num_retries)
202
203         self._filepos += len(data)
204         return data
205
206     @_FileLikeObjectBase._before_close
207     @retry_method
208     def readfrom(self, start, size, num_retries=None):
209         """Read up to 'size' bytes from the stream, starting at 'start'"""
210         if size == 0:
211             return ''
212
213         data = []
214         for lr in locators_and_ranges(self.segments, start, size):
215             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
216                                               num_retries=num_retries))
217         return ''.join(data)
218
219     def as_manifest(self):
220         segs = []
221         for r in self.segments:
222             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
223         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
224
225
226 def synchronized(orig_func):
227     @functools.wraps(orig_func)
228     def synchronized_wrapper(self, *args, **kwargs):
229         with self.lock:
230             return orig_func(self, *args, **kwargs)
231     return synchronized_wrapper
232
233 class _BufferBlock(object):
234     """A stand-in for a Keep block that is in the process of being written.
235
236     Writers can append to it, get the size, and compute the Keep locator.
237     There are three valid states:
238
239     WRITABLE
240       Can append to block.
241
242     PENDING
243       Block is in the process of being uploaded to Keep, append is an error.
244
245     COMMITTED
246       The block has been written to Keep, its internal buffer has been
247       released, fetching the block will fetch it via keep client (since we
248       discarded the internal copy), and identifiers referring to the BufferBlock
249       can be replaced with the block locator.
250
251     """
252
253     WRITABLE = 0
254     PENDING = 1
255     COMMITTED = 2
256
257     def __init__(self, blockid, starting_capacity, owner):
258         """
259         :blockid:
260           the identifier for this block
261
262         :starting_capacity:
263           the initial buffer capacity
264
265         :owner:
266           ArvadosFile that owns this block
267
268         """
269         self.blockid = blockid
270         self.buffer_block = bytearray(starting_capacity)
271         self.buffer_view = memoryview(self.buffer_block)
272         self.write_pointer = 0
273         self._state = _BufferBlock.WRITABLE
274         self._locator = None
275         self.owner = owner
276         self.lock = threading.Lock()
277
278     @synchronized
279     def append(self, data):
280         """Append some data to the buffer.
281
282         Only valid if the block is in WRITABLE state.  Implements an expanding
283         buffer, doubling capacity as needed to accomdate all the data.
284
285         """
286         if self._state == _BufferBlock.WRITABLE:
287             while (self.write_pointer+len(data)) > len(self.buffer_block):
288                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
289                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
290                 self.buffer_block = new_buffer_block
291                 self.buffer_view = memoryview(self.buffer_block)
292             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
293             self.write_pointer += len(data)
294             self._locator = None
295         else:
296             raise AssertionError("Buffer block is not writable")
297
298     @synchronized
299     def set_state(self, nextstate, loc=None):
300         if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
301             (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
302             self._state = nextstate
303             if self._state == _BufferBlock.COMMITTED:
304                 self._locator = loc
305                 self.buffer_view = None
306                 self.buffer_block = None
307         else:
308             raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate))
309
310     @synchronized
311     def state(self):
312         return self._state
313
314     def size(self):
315         """The amount of data written to the buffer."""
316         return self.write_pointer
317
318     @synchronized
319     def locator(self):
320         """The Keep locator for this buffer's contents."""
321         if self._locator is None:
322             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
323         return self._locator
324
325     @synchronized
326     def clone(self, new_blockid, owner):
327         if self._state == _BufferBlock.COMMITTED:
328             raise AssertionError("Can only duplicate a writable or pending buffer block")
329         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
330         bufferblock.append(self.buffer_view[0:self.size()])
331         return bufferblock
332
333
334 class NoopLock(object):
335     def __enter__(self):
336         return self
337
338     def __exit__(self, exc_type, exc_value, traceback):
339         pass
340
341     def acquire(self, blocking=False):
342         pass
343
344     def release(self):
345         pass
346
347
348 def must_be_writable(orig_func):
349     @functools.wraps(orig_func)
350     def must_be_writable_wrapper(self, *args, **kwargs):
351         if not self.writable():
352             raise IOError((errno.EROFS, "Collection must be writable."))
353         return orig_func(self, *args, **kwargs)
354     return must_be_writable_wrapper
355
356
357 class _BlockManager(object):
358     """BlockManager handles buffer blocks.
359
360     Also handles background block uploads, and background block prefetch for a
361     Collection of ArvadosFiles.
362
363     """
364     def __init__(self, keep):
365         """keep: KeepClient object to use"""
366         self._keep = keep
367         self._bufferblocks = {}
368         self._put_queue = None
369         self._put_errors = None
370         self._put_threads = None
371         self._prefetch_queue = None
372         self._prefetch_threads = None
373         self.lock = threading.Lock()
374         self.prefetch_enabled = True
375         self.num_put_threads = 2
376         self.num_get_threads = 2
377
378     @synchronized
379     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
380         """Allocate a new, empty bufferblock in WRITABLE state and return it.
381
382         :blockid:
383           optional block identifier, otherwise one will be automatically assigned
384
385         :starting_capacity:
386           optional capacity, otherwise will use default capacity
387
388         :owner:
389           ArvadosFile that owns this block
390
391         """
392         if blockid is None:
393             blockid = "bufferblock%i" % len(self._bufferblocks)
394         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
395         self._bufferblocks[bufferblock.blockid] = bufferblock
396         return bufferblock
397
398     @synchronized
399     def dup_block(self, block, owner):
400         """Create a new bufferblock initialized with the content of an existing bufferblock.
401
402         :block:
403           the buffer block to copy.
404
405         :owner:
406           ArvadosFile that owns the new block
407
408         """
409         new_blockid = "bufferblock%i" % len(self._bufferblocks)
410         bufferblock = block.clone(new_blockid, owner)
411         self._bufferblocks[bufferblock.blockid] = bufferblock
412         return bufferblock
413
414     @synchronized
415     def is_bufferblock(self, locator):
416         return locator in self._bufferblocks
417
418     @synchronized
419     def stop_threads(self):
420         """Shut down and wait for background upload and download threads to finish."""
421
422         if self._put_threads is not None:
423             for t in self._put_threads:
424                 self._put_queue.put(None)
425             for t in self._put_threads:
426                 t.join()
427         self._put_threads = None
428         self._put_queue = None
429         self._put_errors = None
430
431         if self._prefetch_threads is not None:
432             for t in self._prefetch_threads:
433                 self._prefetch_queue.put(None)
434             for t in self._prefetch_threads:
435                 t.join()
436         self._prefetch_threads = None
437         self._prefetch_queue = None
438
439     def commit_bufferblock(self, block):
440         """Initiate a background upload of a bufferblock.
441
442         This will block if the upload queue is at capacity, otherwise it will
443         return immediately.
444
445         """
446
447         def commit_bufferblock_worker(self):
448             """Background uploader thread."""
449
450             while True:
451                 try:
452                     bufferblock = self._put_queue.get()
453                     if bufferblock is None:
454                         return
455                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
456                     bufferblock.set_state(_BufferBlock.COMMITTED, loc)
457
458                 except Exception as e:
459                     self._put_errors.put((bufferblock.locator(), e))
460                 finally:
461                     if self._put_queue is not None:
462                         self._put_queue.task_done()
463
464         with self.lock:
465             if self._put_threads is None:
466                 # Start uploader threads.
467
468                 # If we don't limit the Queue size, the upload queue can quickly
469                 # grow to take up gigabytes of RAM if the writing process is
470                 # generating data more quickly than it can be send to the Keep
471                 # servers.
472                 #
473                 # With two upload threads and a queue size of 2, this means up to 4
474                 # blocks pending.  If they are full 64 MiB blocks, that means up to
475                 # 256 MiB of internal buffering, which is the same size as the
476                 # default download block cache in KeepClient.
477                 self._put_queue = Queue.Queue(maxsize=2)
478                 self._put_errors = Queue.Queue()
479
480                 self._put_threads = []
481                 for i in xrange(0, self.num_put_threads):
482                     thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
483                     self._put_threads.append(thread)
484                     thread.daemon = True
485                     thread.start()
486
487         if block.state() == _BufferBlock.WRITABLE:
488             # Mark the block as PENDING so to disallow any more appends.
489             block.set_state(_BufferBlock.PENDING)
490             self._put_queue.put(block)
491
492     @synchronized
493     def get_bufferblock(self, locator):
494         return self._bufferblocks.get(locator)
495
496     def get_block_contents(self, locator, num_retries, cache_only=False):
497         """Fetch a block.
498
499         First checks to see if the locator is a BufferBlock and return that, if
500         not, passes the request through to KeepClient.get().
501
502         """
503         with self.lock:
504             if locator in self._bufferblocks:
505                 bufferblock = self._bufferblocks[locator]
506                 if bufferblock.state() != _BufferBlock.COMMITTED:
507                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
508                 else:
509                     locator = bufferblock._locator
510         if cache_only:
511             return self._keep.get_from_cache(locator)
512         else:
513             return self._keep.get(locator, num_retries=num_retries)
514
515     def commit_all(self):
516         """Commit all outstanding buffer blocks.
517
518         Unlike commit_bufferblock(), this is a synchronous call, and will not
519         return until all buffer blocks are uploaded.  Raises
520         KeepWriteError() if any blocks failed to upload.
521
522         """
523         with self.lock:
524             items = self._bufferblocks.items()
525
526         for k,v in items:
527             v.owner.flush()
528
529         with self.lock:
530             if self._put_queue is not None:
531                 self._put_queue.join()
532
533                 if not self._put_errors.empty():
534                     err = []
535                     try:
536                         while True:
537                             err.append(self._put_errors.get(False))
538                     except Queue.Empty:
539                         pass
540                     raise KeepWriteError("Error writing some blocks", err, label="block")
541
542     def block_prefetch(self, locator):
543         """Initiate a background download of a block.
544
545         This assumes that the underlying KeepClient implements a block cache,
546         so repeated requests for the same block will not result in repeated
547         downloads (unless the block is evicted from the cache.)  This method
548         does not block.
549
550         """
551
552         if not self.prefetch_enabled:
553             return
554
555         def block_prefetch_worker(self):
556             """The background downloader thread."""
557             while True:
558                 try:
559                     b = self._prefetch_queue.get()
560                     if b is None:
561                         return
562                     self._keep.get(b)
563                 except Exception:
564                     pass
565
566         with self.lock:
567             if locator in self._bufferblocks:
568                 return
569             if self._prefetch_threads is None:
570                 self._prefetch_queue = Queue.Queue()
571                 self._prefetch_threads = []
572                 for i in xrange(0, self.num_get_threads):
573                     thread = threading.Thread(target=block_prefetch_worker, args=(self,))
574                     self._prefetch_threads.append(thread)
575                     thread.daemon = True
576                     thread.start()
577         self._prefetch_queue.put(locator)
578
579
580 class ArvadosFile(object):
581     """Represent a file in a Collection.
582
583     ArvadosFile manages the underlying representation of a file in Keep as a
584     sequence of segments spanning a set of blocks, and implements random
585     read/write access.
586
587     This object may be accessed from multiple threads.
588
589     """
590
591     def __init__(self, parent, stream=[], segments=[]):
592         """
593         ArvadosFile constructor.
594
595         :stream:
596           a list of Range objects representing a block stream
597
598         :segments:
599           a list of Range objects representing segments
600         """
601         self.parent = parent
602         self._modified = True
603         self._segments = []
604         self.lock = parent.root_collection().lock
605         for s in segments:
606             self._add_segment(stream, s.locator, s.range_size)
607         self._current_bblock = None
608
609     def writable(self):
610         return self.parent.writable()
611
612     @synchronized
613     def segments(self):
614         return copy.copy(self._segments)
615
616     @synchronized
617     def clone(self, new_parent):
618         """Make a copy of this file."""
619         cp = ArvadosFile(new_parent)
620         cp.replace_contents(self)
621         return cp
622
623     @must_be_writable
624     @synchronized
625     def replace_contents(self, other):
626         """Replace segments of this file with segments from another `ArvadosFile` object."""
627
628         map_loc = {}
629         self._segments = []
630         for other_segment in other.segments():
631             new_loc = other_segment.locator
632             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
633                 if other_segment.locator not in map_loc:
634                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
635                     if bufferblock.state() != _BufferBlock.WRITABLE:
636                         map_loc[other_segment.locator] = bufferblock.locator()
637                     else:
638                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
639                 new_loc = map_loc[other_segment.locator]
640
641             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
642
643         self._modified = True
644
645     def __eq__(self, other):
646         if other is self:
647             return True
648         if not isinstance(other, ArvadosFile):
649             return False
650
651         othersegs = other.segments()
652         with self.lock:
653             if len(self._segments) != len(othersegs):
654                 return False
655             for i in xrange(0, len(othersegs)):
656                 seg1 = self._segments[i]
657                 seg2 = othersegs[i]
658                 loc1 = seg1.locator
659                 loc2 = seg2.locator
660
661                 if self.parent._my_block_manager().is_bufferblock(loc1):
662                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
663
664                 if other.parent._my_block_manager().is_bufferblock(loc2):
665                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
666
667                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
668                     seg1.range_start != seg2.range_start or
669                     seg1.range_size != seg2.range_size or
670                     seg1.segment_offset != seg2.segment_offset):
671                     return False
672
673         return True
674
675     def __ne__(self, other):
676         return not self.__eq__(other)
677
678     @synchronized
679     def set_unmodified(self):
680         """Clear the modified flag"""
681         self._modified = False
682
683     @synchronized
684     def modified(self):
685         """Test the modified flag"""
686         return self._modified
687
688     @must_be_writable
689     @synchronized
690     def truncate(self, size):
691         """Shrink the size of the file.
692
693         If `size` is less than the size of the file, the file contents after
694         `size` will be discarded.  If `size` is greater than the current size
695         of the file, an IOError will be raised.
696
697         """
698         if size < self.size():
699             new_segs = []
700             for r in self._segments:
701                 range_end = r.range_start+r.range_size
702                 if r.range_start >= size:
703                     # segment is past the trucate size, all done
704                     break
705                 elif size < range_end:
706                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
707                     nr.segment_offset = r.segment_offset
708                     new_segs.append(nr)
709                     break
710                 else:
711                     new_segs.append(r)
712
713             self._segments = new_segs
714             self._modified = True
715         elif size > self.size():
716             raise IOError("truncate() does not support extending the file size")
717
718
719     def readfrom(self, offset, size, num_retries, exact=False):
720         """Read up to `size` bytes from the file starting at `offset`.
721
722         :exact:
723          If False (default), return less data than requested if the read
724          crosses a block boundary and the next block isn't cached.  If True,
725          only return less data than requested when hitting EOF.
726         """
727
728         with self.lock:
729             if size == 0 or offset >= self.size():
730                 return ''
731             prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
732             readsegs = locators_and_ranges(self._segments, offset, size)
733
734         for lr in prefetch:
735             self.parent._my_block_manager().block_prefetch(lr.locator)
736
737         data = []
738         for lr in readsegs:
739             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
740             if block:
741                 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
742             else:
743                 break
744         return ''.join(data)
745
746     def _repack_writes(self, num_retries):
747         """Test if the buffer block has more data than actual segments.
748
749         This happens when a buffered write over-writes a file range written in
750         a previous buffered write.  Re-pack the buffer block for efficiency
751         and to avoid leaking information.
752
753         """
754         segs = self._segments
755
756         # Sum up the segments to get the total bytes of the file referencing
757         # into the buffer block.
758         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
759         write_total = sum([s.range_size for s in bufferblock_segs])
760
761         if write_total < self._current_bblock.size():
762             # There is more data in the buffer block than is actually accounted for by segments, so
763             # re-pack into a new buffer by copying over to a new buffer block.
764             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
765             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
766             for t in bufferblock_segs:
767                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
768                 t.segment_offset = new_bb.size() - t.range_size
769
770             self._current_bblock = new_bb
771
772     @must_be_writable
773     @synchronized
774     def writeto(self, offset, data, num_retries):
775         """Write `data` to the file starting at `offset`.
776
777         This will update existing bytes and/or extend the size of the file as
778         necessary.
779
780         """
781         if len(data) == 0:
782             return
783
784         if offset > self.size():
785             raise ArgumentError("Offset is past the end of the file")
786
787         if len(data) > config.KEEP_BLOCK_SIZE:
788             # Chunk it up into smaller writes
789             n = 0
790             dataview = memoryview(data)
791             while (n + config.KEEP_BLOCK_SIZE) < len(data):
792                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
793                 n += config.KEEP_BLOCK_SIZE
794             if n < len(data):
795                 self.writeto(offset+n, dataview[n:].tobytes(), num_retries)
796             return
797
798         self._modified = True
799
800         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
801             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
802
803         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
804             self._repack_writes(num_retries)
805             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
806                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
807                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
808
809         self._current_bblock.append(data)
810
811         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
812
813     @synchronized
814     def flush(self, num_retries=0):
815         if self._current_bblock:
816             self._repack_writes(num_retries)
817             self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
818
819     @must_be_writable
820     @synchronized
821     def add_segment(self, blocks, pos, size):
822         """Add a segment to the end of the file.
823
824         `pos` and `offset` reference a section of the stream described by
825         `blocks` (a list of Range objects)
826
827         """
828         self._add_segment(blocks, pos, size)
829
830     def _add_segment(self, blocks, pos, size):
831         """Internal implementation of add_segment."""
832         self._modified = True
833         for lr in locators_and_ranges(blocks, pos, size):
834             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
835             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
836             self._segments.append(r)
837
838     @synchronized
839     def size(self):
840         """Get the file size."""
841         if self._segments:
842             n = self._segments[-1]
843             return n.range_start + n.range_size
844         else:
845             return 0
846
847     @synchronized
848     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
849         buf = ""
850         filestream = []
851         for segment in self.segments:
852             loc = segment.locator
853             if loc.startswith("bufferblock"):
854                 loc = self._bufferblocks[loc].calculate_locator()
855             if portable_locators:
856                 loc = KeepLocator(loc).stripped()
857             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
858                                  segment.segment_offset, segment.range_size))
859         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
860         buf += "\n"
861         return buf
862
863
864 class ArvadosFileReader(ArvadosFileReaderBase):
865     """Wraps ArvadosFile in a file-like object supporting reading only.
866
867     Be aware that this class is NOT thread safe as there is no locking around
868     updating file pointer.
869
870     """
871
872     def __init__(self, arvadosfile, name, mode="r", num_retries=None):
873         super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
874         self.arvadosfile = arvadosfile
875
876     def size(self):
877         return self.arvadosfile.size()
878
879     def stream_name(self):
880         return self.arvadosfile.parent.stream_name()
881
882     @_FileLikeObjectBase._before_close
883     @retry_method
884     def read(self, size, num_retries=None):
885         """Read up to `size` bytes from the stream, starting at the current file position."""
886         data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
887         self._filepos += len(data)
888         return data
889
890     @_FileLikeObjectBase._before_close
891     @retry_method
892     def readfrom(self, offset, size, num_retries=None):
893         """Read up to `size` bytes from the stream, starting at the current file position."""
894         return self.arvadosfile.readfrom(offset, size, num_retries)
895
896     def flush(self):
897         pass
898
899
900 class ArvadosFileWriter(ArvadosFileReader):
901     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
902
903     Be aware that this class is NOT thread safe as there is no locking around
904     updating file pointer.
905
906     """
907
908     def __init__(self, arvadosfile, name, mode, num_retries=None):
909         super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
910
911     @_FileLikeObjectBase._before_close
912     @retry_method
913     def write(self, data, num_retries=None):
914         if self.mode[0] == "a":
915             self.arvadosfile.writeto(self.size(), data, num_retries)
916         else:
917             self.arvadosfile.writeto(self._filepos, data, num_retries)
918             self._filepos += len(data)
919
920     @_FileLikeObjectBase._before_close
921     @retry_method
922     def writelines(self, seq, num_retries=None):
923         for s in seq:
924             self.write(s, num_retries)
925
926     @_FileLikeObjectBase._before_close
927     def truncate(self, size=None):
928         if size is None:
929             size = self._filepos
930         self.arvadosfile.truncate(size)
931         if self._filepos > self.size():
932             self._filepos = self.size()
933
934     @_FileLikeObjectBase._before_close
935     def flush(self):
936         self.arvadosfile.flush()
937
938     def close(self):
939         if not self.closed:
940             self.flush()
941             super(ArvadosFileWriter, self).close()