Merge branch 'master' into 5145-combine-collections-repeated-filenames
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 from ._ranges import locators_and_ranges, replace_range, Range
6 from arvados.retry import retry_method
7 import config
8 import hashlib
9 import threading
10 import Queue
11 import copy
12 import errno
13 from .errors import KeepWriteError, AssertionError
14 from .keep import KeepLocator
15 from _normalize_stream import normalize_stream
16
17 def split(path):
18     """split(path) -> streamname, filename
19
20     Separate the stream name and file name in a /-separated stream path and
21     return a tuple (stream_name, file_name).  If no stream name is available,
22     assume '.'.
23
24     """
25     try:
26         stream_name, file_name = path.rsplit('/', 1)
27     except ValueError:  # No / in string
28         stream_name, file_name = '.', path
29     return stream_name, file_name
30
31 class _FileLikeObjectBase(object):
32     def __init__(self, name, mode):
33         self.name = name
34         self.mode = mode
35         self.closed = False
36
37     @staticmethod
38     def _before_close(orig_func):
39         @functools.wraps(orig_func)
40         def before_close_wrapper(self, *args, **kwargs):
41             if self.closed:
42                 raise ValueError("I/O operation on closed stream file")
43             return orig_func(self, *args, **kwargs)
44         return before_close_wrapper
45
46     def __enter__(self):
47         return self
48
49     def __exit__(self, exc_type, exc_value, traceback):
50         try:
51             self.close()
52         except Exception:
53             if exc_type is None:
54                 raise
55
56     def close(self):
57         self.closed = True
58
59
60 class ArvadosFileReaderBase(_FileLikeObjectBase):
61     def __init__(self, name, mode, num_retries=None):
62         super(ArvadosFileReaderBase, self).__init__(name, mode)
63         self._filepos = 0L
64         self.num_retries = num_retries
65         self._readline_cache = (None, None)
66
67     def __iter__(self):
68         while True:
69             data = self.readline()
70             if not data:
71                 break
72             yield data
73
74     def decompressed_name(self):
75         return re.sub('\.(bz2|gz)$', '', self.name)
76
77     @_FileLikeObjectBase._before_close
78     def seek(self, pos, whence=os.SEEK_CUR):
79         if whence == os.SEEK_CUR:
80             pos += self._filepos
81         elif whence == os.SEEK_END:
82             pos += self.size()
83         self._filepos = min(max(pos, 0L), self.size())
84
85     def tell(self):
86         return self._filepos
87
88     @_FileLikeObjectBase._before_close
89     @retry_method
90     def readall(self, size=2**20, num_retries=None):
91         while True:
92             data = self.read(size, num_retries=num_retries)
93             if data == '':
94                 break
95             yield data
96
97     @_FileLikeObjectBase._before_close
98     @retry_method
99     def readline(self, size=float('inf'), num_retries=None):
100         cache_pos, cache_data = self._readline_cache
101         if self.tell() == cache_pos:
102             data = [cache_data]
103         else:
104             data = ['']
105         data_size = len(data[-1])
106         while (data_size < size) and ('\n' not in data[-1]):
107             next_read = self.read(2 ** 20, num_retries=num_retries)
108             if not next_read:
109                 break
110             data.append(next_read)
111             data_size += len(next_read)
112         data = ''.join(data)
113         try:
114             nextline_index = data.index('\n') + 1
115         except ValueError:
116             nextline_index = len(data)
117         nextline_index = min(nextline_index, size)
118         self._readline_cache = (self.tell(), data[nextline_index:])
119         return data[:nextline_index]
120
121     @_FileLikeObjectBase._before_close
122     @retry_method
123     def decompress(self, decompress, size, num_retries=None):
124         for segment in self.readall(size, num_retries):
125             data = decompress(segment)
126             if data:
127                 yield data
128
129     @_FileLikeObjectBase._before_close
130     @retry_method
131     def readall_decompressed(self, size=2**20, num_retries=None):
132         self.seek(0)
133         if self.name.endswith('.bz2'):
134             dc = bz2.BZ2Decompressor()
135             return self.decompress(dc.decompress, size,
136                                    num_retries=num_retries)
137         elif self.name.endswith('.gz'):
138             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
139             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
140                                    size, num_retries=num_retries)
141         else:
142             return self.readall(size, num_retries=num_retries)
143
144     @_FileLikeObjectBase._before_close
145     @retry_method
146     def readlines(self, sizehint=float('inf'), num_retries=None):
147         data = []
148         data_size = 0
149         for s in self.readall(num_retries=num_retries):
150             data.append(s)
151             data_size += len(s)
152             if data_size >= sizehint:
153                 break
154         return ''.join(data).splitlines(True)
155
156     def size(self):
157         raise NotImplementedError()
158
159     def read(self, size, num_retries=None):
160         raise NotImplementedError()
161
162     def readfrom(self, start, size, num_retries=None):
163         raise NotImplementedError()
164
165
166 class StreamFileReader(ArvadosFileReaderBase):
167     class _NameAttribute(str):
168         # The Python file API provides a plain .name attribute.
169         # Older SDK provided a name() method.
170         # This class provides both, for maximum compatibility.
171         def __call__(self):
172             return self
173
174     def __init__(self, stream, segments, name):
175         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
176         self._stream = stream
177         self.segments = segments
178
179     def stream_name(self):
180         return self._stream.name()
181
182     def size(self):
183         n = self.segments[-1]
184         return n.range_start + n.range_size
185
186     @_FileLikeObjectBase._before_close
187     @retry_method
188     def read(self, size, num_retries=None):
189         """Read up to 'size' bytes from the stream, starting at the current file position"""
190         if size == 0:
191             return ''
192
193         data = ''
194         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
195         if available_chunks:
196             lr = available_chunks[0]
197             data = self._stream.readfrom(lr.locator+lr.segment_offset,
198                                           lr.segment_size,
199                                           num_retries=num_retries)
200
201         self._filepos += len(data)
202         return data
203
204     @_FileLikeObjectBase._before_close
205     @retry_method
206     def readfrom(self, start, size, num_retries=None):
207         """Read up to 'size' bytes from the stream, starting at 'start'"""
208         if size == 0:
209             return ''
210
211         data = []
212         for lr in locators_and_ranges(self.segments, start, size):
213             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
214                                               num_retries=num_retries))
215         return ''.join(data)
216
217     def as_manifest(self):
218         segs = []
219         for r in self.segments:
220             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
221         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
222
223
224 def synchronized(orig_func):
225     @functools.wraps(orig_func)
226     def synchronized_wrapper(self, *args, **kwargs):
227         with self.lock:
228             return orig_func(self, *args, **kwargs)
229     return synchronized_wrapper
230
231 class _BufferBlock(object):
232     """A stand-in for a Keep block that is in the process of being written.
233
234     Writers can append to it, get the size, and compute the Keep locator.
235     There are three valid states:
236
237     WRITABLE
238       Can append to block.
239
240     PENDING
241       Block is in the process of being uploaded to Keep, append is an error.
242
243     COMMITTED
244       The block has been written to Keep, its internal buffer has been
245       released, fetching the block will fetch it via keep client (since we
246       discarded the internal copy), and identifiers referring to the BufferBlock
247       can be replaced with the block locator.
248
249     """
250
251     WRITABLE = 0
252     PENDING = 1
253     COMMITTED = 2
254
255     def __init__(self, blockid, starting_capacity, owner):
256         """
257         :blockid:
258           the identifier for this block
259
260         :starting_capacity:
261           the initial buffer capacity
262
263         :owner:
264           ArvadosFile that owns this block
265
266         """
267         self.blockid = blockid
268         self.buffer_block = bytearray(starting_capacity)
269         self.buffer_view = memoryview(self.buffer_block)
270         self.write_pointer = 0
271         self._state = _BufferBlock.WRITABLE
272         self._locator = None
273         self.owner = owner
274         self.lock = threading.Lock()
275
276     @synchronized
277     def append(self, data):
278         """Append some data to the buffer.
279
280         Only valid if the block is in WRITABLE state.  Implements an expanding
281         buffer, doubling capacity as needed to accomdate all the data.
282
283         """
284         if self._state == _BufferBlock.WRITABLE:
285             while (self.write_pointer+len(data)) > len(self.buffer_block):
286                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
287                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
288                 self.buffer_block = new_buffer_block
289                 self.buffer_view = memoryview(self.buffer_block)
290             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
291             self.write_pointer += len(data)
292             self._locator = None
293         else:
294             raise AssertionError("Buffer block is not writable")
295
296     @synchronized
297     def set_state(self, nextstate, loc=None):
298         if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
299             (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
300             self._state = nextstate
301             if self._state == _BufferBlock.COMMITTED:
302                 self._locator = loc
303                 self.buffer_view = None
304                 self.buffer_block = None
305         else:
306             raise AssertionError("Invalid state change from %s to %s" % (self.state, state))
307
308     @synchronized
309     def state(self):
310         return self._state
311
312     def size(self):
313         """The amount of data written to the buffer."""
314         return self.write_pointer
315
316     @synchronized
317     def locator(self):
318         """The Keep locator for this buffer's contents."""
319         if self._locator is None:
320             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
321         return self._locator
322
323     @synchronized
324     def clone(self, new_blockid, owner):
325         if self._state == _BufferBlock.COMMITTED:
326             raise AssertionError("Can only duplicate a writable or pending buffer block")
327         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
328         bufferblock.append(self.buffer_view[0:self.size()])
329         return bufferblock
330
331
332 class NoopLock(object):
333     def __enter__(self):
334         return self
335
336     def __exit__(self, exc_type, exc_value, traceback):
337         pass
338
339     def acquire(self, blocking=False):
340         pass
341
342     def release(self):
343         pass
344
345
346 def must_be_writable(orig_func):
347     @functools.wraps(orig_func)
348     def must_be_writable_wrapper(self, *args, **kwargs):
349         if not self.writable():
350             raise IOError((errno.EROFS, "Collection must be writable."))
351         return orig_func(self, *args, **kwargs)
352     return must_be_writable_wrapper
353
354
355 class _BlockManager(object):
356     """BlockManager handles buffer blocks.
357
358     Also handles background block uploads, and background block prefetch for a
359     Collection of ArvadosFiles.
360
361     """
362     def __init__(self, keep):
363         """keep: KeepClient object to use"""
364         self._keep = keep
365         self._bufferblocks = {}
366         self._put_queue = None
367         self._put_errors = None
368         self._put_threads = None
369         self._prefetch_queue = None
370         self._prefetch_threads = None
371         self.lock = threading.Lock()
372         self.prefetch_enabled = True
373         self.num_put_threads = 2
374         self.num_get_threads = 2
375
376     @synchronized
377     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
378         """Allocate a new, empty bufferblock in WRITABLE state and return it.
379
380         :blockid:
381           optional block identifier, otherwise one will be automatically assigned
382
383         :starting_capacity:
384           optional capacity, otherwise will use default capacity
385
386         :owner:
387           ArvadosFile that owns this block
388
389         """
390         if blockid is None:
391             blockid = "bufferblock%i" % len(self._bufferblocks)
392         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
393         self._bufferblocks[bufferblock.blockid] = bufferblock
394         return bufferblock
395
396     @synchronized
397     def dup_block(self, block, owner):
398         """Create a new bufferblock initialized with the content of an existing bufferblock.
399
400         :block:
401           the buffer block to copy.
402
403         :owner:
404           ArvadosFile that owns the new block
405
406         """
407         new_blockid = "bufferblock%i" % len(self._bufferblocks)
408         bufferblock = block.clone(new_blockid, owner)
409         self._bufferblocks[bufferblock.blockid] = bufferblock
410         return bufferblock
411
412     @synchronized
413     def is_bufferblock(self, locator):
414         return locator in self._bufferblocks
415
416     @synchronized
417     def stop_threads(self):
418         """Shut down and wait for background upload and download threads to finish."""
419
420         if self._put_threads is not None:
421             for t in self._put_threads:
422                 self._put_queue.put(None)
423             for t in self._put_threads:
424                 t.join()
425         self._put_threads = None
426         self._put_queue = None
427         self._put_errors = None
428
429         if self._prefetch_threads is not None:
430             for t in self._prefetch_threads:
431                 self._prefetch_queue.put(None)
432             for t in self._prefetch_threads:
433                 t.join()
434         self._prefetch_threads = None
435         self._prefetch_queue = None
436
437     def commit_bufferblock(self, block):
438         """Initiate a background upload of a bufferblock.
439
440         This will block if the upload queue is at capacity, otherwise it will
441         return immediately.
442
443         """
444
445         def commit_bufferblock_worker(self):
446             """Background uploader thread."""
447
448             while True:
449                 try:
450                     bufferblock = self._put_queue.get()
451                     if bufferblock is None:
452                         return
453                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
454                     bufferblock.set_state(_BufferBlock.COMMITTED, loc)
455
456                 except Exception as e:
457                     self._put_errors.put((bufferblock.locator(), e))
458                 finally:
459                     if self._put_queue is not None:
460                         self._put_queue.task_done()
461
462         with self.lock:
463             if self._put_threads is None:
464                 # Start uploader threads.
465
466                 # If we don't limit the Queue size, the upload queue can quickly
467                 # grow to take up gigabytes of RAM if the writing process is
468                 # generating data more quickly than it can be send to the Keep
469                 # servers.
470                 #
471                 # With two upload threads and a queue size of 2, this means up to 4
472                 # blocks pending.  If they are full 64 MiB blocks, that means up to
473                 # 256 MiB of internal buffering, which is the same size as the
474                 # default download block cache in KeepClient.
475                 self._put_queue = Queue.Queue(maxsize=2)
476                 self._put_errors = Queue.Queue()
477
478                 self._put_threads = []
479                 for i in xrange(0, self.num_put_threads):
480                     thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
481                     self._put_threads.append(thread)
482                     thread.daemon = True
483                     thread.start()
484
485         # Mark the block as PENDING so to disallow any more appends.
486         block.set_state(_BufferBlock.PENDING)
487         self._put_queue.put(block)
488
489     @synchronized
490     def get_bufferblock(self, locator):
491         return self._bufferblocks.get(locator)
492
493     def get_block_contents(self, locator, num_retries, cache_only=False):
494         """Fetch a block.
495
496         First checks to see if the locator is a BufferBlock and return that, if
497         not, passes the request through to KeepClient.get().
498
499         """
500         with self.lock:
501             if locator in self._bufferblocks:
502                 bufferblock = self._bufferblocks[locator]
503                 if bufferblock.state() != _BufferBlock.COMMITTED:
504                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
505                 else:
506                     locator = bufferblock._locator
507         if cache_only:
508             return self._keep.get_from_cache(locator)
509         else:
510             return self._keep.get(locator, num_retries=num_retries)
511
512     def commit_all(self):
513         """Commit all outstanding buffer blocks.
514
515         Unlike commit_bufferblock(), this is a synchronous call, and will not
516         return until all buffer blocks are uploaded.  Raises
517         KeepWriteError() if any blocks failed to upload.
518
519         """
520         with self.lock:
521             items = self._bufferblocks.items()
522
523         for k,v in items:
524             if v.state() == _BufferBlock.WRITABLE:
525                 self.commit_bufferblock(v)
526
527         with self.lock:
528             if self._put_queue is not None:
529                 self._put_queue.join()
530
531                 if not self._put_errors.empty():
532                     err = []
533                     try:
534                         while True:
535                             err.append(self._put_errors.get(False))
536                     except Queue.Empty:
537                         pass
538                     raise KeepWriteError("Error writing some blocks", err, label="block")
539
540     def block_prefetch(self, locator):
541         """Initiate a background download of a block.
542
543         This assumes that the underlying KeepClient implements a block cache,
544         so repeated requests for the same block will not result in repeated
545         downloads (unless the block is evicted from the cache.)  This method
546         does not block.
547
548         """
549
550         if not self.prefetch_enabled:
551             return
552
553         def block_prefetch_worker(self):
554             """The background downloader thread."""
555             while True:
556                 try:
557                     b = self._prefetch_queue.get()
558                     if b is None:
559                         return
560                     self._keep.get(b)
561                 except Exception:
562                     pass
563
564         with self.lock:
565             if locator in self._bufferblocks:
566                 return
567             if self._prefetch_threads is None:
568                 self._prefetch_queue = Queue.Queue()
569                 self._prefetch_threads = []
570                 for i in xrange(0, self.num_get_threads):
571                     thread = threading.Thread(target=block_prefetch_worker, args=(self,))
572                     self._prefetch_threads.append(thread)
573                     thread.daemon = True
574                     thread.start()
575         self._prefetch_queue.put(locator)
576
577
578 class ArvadosFile(object):
579     """Represent a file in a Collection.
580
581     ArvadosFile manages the underlying representation of a file in Keep as a
582     sequence of segments spanning a set of blocks, and implements random
583     read/write access.
584
585     This object may be accessed from multiple threads.
586
587     """
588
589     def __init__(self, parent, stream=[], segments=[]):
590         """
591         ArvadosFile constructor.
592
593         :stream:
594           a list of Range objects representing a block stream
595
596         :segments:
597           a list of Range objects representing segments
598         """
599         self.parent = parent
600         self._modified = True
601         self._segments = []
602         self.lock = parent.root_collection().lock
603         for s in segments:
604             self._add_segment(stream, s.locator, s.range_size)
605         self._current_bblock = None
606
607     def writable(self):
608         return self.parent.writable()
609
610     @synchronized
611     def segments(self):
612         return copy.copy(self._segments)
613
614     @synchronized
615     def clone(self, new_parent):
616         """Make a copy of this file."""
617         cp = ArvadosFile(new_parent)
618         cp.replace_contents(self)
619         return cp
620
621     @must_be_writable
622     @synchronized
623     def replace_contents(self, other):
624         """Replace segments of this file with segments from another `ArvadosFile` object."""
625
626         map_loc = {}
627         self._segments = []
628         for other_segment in other.segments():
629             new_loc = other_segment.locator
630             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
631                 if other_segment.locator not in map_loc:
632                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
633                     if bufferblock.state() != _BufferBlock.WRITABLE:
634                         map_loc[other_segment.locator] = bufferblock.locator()
635                     else:
636                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
637                 new_loc = map_loc[other_segment.locator]
638
639             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
640
641         self._modified = True
642
643     def __eq__(self, other):
644         if other is self:
645             return True
646         if not isinstance(other, ArvadosFile):
647             return False
648
649         othersegs = other.segments()
650         with self.lock:
651             if len(self._segments) != len(othersegs):
652                 return False
653             for i in xrange(0, len(othersegs)):
654                 seg1 = self._segments[i]
655                 seg2 = othersegs[i]
656                 loc1 = seg1.locator
657                 loc2 = seg2.locator
658
659                 if self.parent._my_block_manager().is_bufferblock(loc1):
660                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
661
662                 if other.parent._my_block_manager().is_bufferblock(loc2):
663                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
664
665                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
666                     seg1.range_start != seg2.range_start or
667                     seg1.range_size != seg2.range_size or
668                     seg1.segment_offset != seg2.segment_offset):
669                     return False
670
671         return True
672
673     def __ne__(self, other):
674         return not self.__eq__(other)
675
676     @synchronized
677     def set_unmodified(self):
678         """Clear the modified flag"""
679         self._modified = False
680
681     @synchronized
682     def modified(self):
683         """Test the modified flag"""
684         return self._modified
685
686     @must_be_writable
687     @synchronized
688     def truncate(self, size):
689         """Shrink the size of the file.
690
691         If `size` is less than the size of the file, the file contents after
692         `size` will be discarded.  If `size` is greater than the current size
693         of the file, an IOError will be raised.
694
695         """
696         if size < self.size():
697             new_segs = []
698             for r in self._segments:
699                 range_end = r.range_start+r.range_size
700                 if r.range_start >= size:
701                     # segment is past the trucate size, all done
702                     break
703                 elif size < range_end:
704                     nr = Range(r.locator, r.range_start, size - r.range_start)
705                     nr.segment_offset = r.segment_offset
706                     new_segs.append(nr)
707                     break
708                 else:
709                     new_segs.append(r)
710
711             self._segments = new_segs
712             self._modified = True
713         elif size > self.size():
714             raise IOError("truncate() does not support extending the file size")
715
716     def readfrom(self, offset, size, num_retries):
717         """Read upto `size` bytes from the file starting at `offset`."""
718
719         with self.lock:
720             if size == 0 or offset >= self.size():
721                 return ''
722             prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
723             readsegs = locators_and_ranges(self._segments, offset, size)
724
725         for lr in prefetch:
726             self.parent._my_block_manager().block_prefetch(lr.locator)
727
728         data = []
729         for lr in readsegs:
730             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=bool(data))
731             if block:
732                 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
733             else:
734                 break
735         return ''.join(data)
736
737     def _repack_writes(self):
738         """Test if the buffer block has more data than actual segments.
739
740         This happens when a buffered write over-writes a file range written in
741         a previous buffered write.  Re-pack the buffer block for efficiency
742         and to avoid leaking information.
743
744         """
745         segs = self._segments
746
747         # Sum up the segments to get the total bytes of the file referencing
748         # into the buffer block.
749         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
750         write_total = sum([s.range_size for s in bufferblock_segs])
751
752         if write_total < self._current_bblock.size():
753             # There is more data in the buffer block than is actually accounted for by segments, so
754             # re-pack into a new buffer by copying over to a new buffer block.
755             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
756             for t in bufferblock_segs:
757                 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
758                 t.segment_offset = new_bb.size() - t.range_size
759
760             self._current_bblock = new_bb
761
762     @must_be_writable
763     @synchronized
764     def writeto(self, offset, data, num_retries):
765         """Write `data` to the file starting at `offset`.
766
767         This will update existing bytes and/or extend the size of the file as
768         necessary.
769
770         """
771         if len(data) == 0:
772             return
773
774         if offset > self.size():
775             raise ArgumentError("Offset is past the end of the file")
776
777         if len(data) > config.KEEP_BLOCK_SIZE:
778             raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
779
780         self._modified = True
781
782         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
783             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
784
785         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
786             self._repack_writes()
787             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
788                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
789                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
790
791         self._current_bblock.append(data)
792
793         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
794
795     @synchronized
796     def flush(self):
797         if self._current_bblock:
798             self._repack_writes()
799             self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
800
801     @must_be_writable
802     @synchronized
803     def add_segment(self, blocks, pos, size):
804         """Add a segment to the end of the file.
805
806         `pos` and `offset` reference a section of the stream described by
807         `blocks` (a list of Range objects)
808
809         """
810         self._add_segment(blocks, pos, size)
811
812     def _add_segment(self, blocks, pos, size):
813         """Internal implementation of add_segment."""
814         self._modified = True
815         for lr in locators_and_ranges(blocks, pos, size):
816             last = self._segments[-1] if self._segments else Range(0, 0, 0)
817             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
818             self._segments.append(r)
819
820     @synchronized
821     def size(self):
822         """Get the file size."""
823         if self._segments:
824             n = self._segments[-1]
825             return n.range_start + n.range_size
826         else:
827             return 0
828
829     @synchronized
830     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
831         buf = ""
832         filestream = []
833         for segment in self.segments:
834             loc = segment.locator
835             if loc.startswith("bufferblock"):
836                 loc = self._bufferblocks[loc].calculate_locator()
837             if portable_locators:
838                 loc = KeepLocator(loc).stripped()
839             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
840                                  segment.segment_offset, segment.range_size))
841         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
842         buf += "\n"
843         return buf
844
845
846 class ArvadosFileReader(ArvadosFileReaderBase):
847     """Wraps ArvadosFile in a file-like object supporting reading only.
848
849     Be aware that this class is NOT thread safe as there is no locking around
850     updating file pointer.
851
852     """
853
854     def __init__(self, arvadosfile, name, mode="r", num_retries=None):
855         super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
856         self.arvadosfile = arvadosfile
857
858     def size(self):
859         return self.arvadosfile.size()
860
861     def stream_name(self):
862         return self.arvadosfile.parent.stream_name()
863
864     @_FileLikeObjectBase._before_close
865     @retry_method
866     def read(self, size, num_retries=None):
867         """Read up to `size` bytes from the stream, starting at the current file position."""
868         data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
869         self._filepos += len(data)
870         return data
871
872     @_FileLikeObjectBase._before_close
873     @retry_method
874     def readfrom(self, offset, size, num_retries=None):
875         """Read up to `size` bytes from the stream, starting at the current file position."""
876         return self.arvadosfile.readfrom(offset, size, num_retries)
877
878     def flush(self):
879         pass
880
881
882 class ArvadosFileWriter(ArvadosFileReader):
883     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
884
885     Be aware that this class is NOT thread safe as there is no locking around
886     updating file pointer.
887
888     """
889
890     def __init__(self, arvadosfile, name, mode, num_retries=None):
891         super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
892
893     @_FileLikeObjectBase._before_close
894     @retry_method
895     def write(self, data, num_retries=None):
896         if self.mode[0] == "a":
897             self.arvadosfile.writeto(self.size(), data, num_retries)
898         else:
899             self.arvadosfile.writeto(self._filepos, data, num_retries)
900             self._filepos += len(data)
901
902     @_FileLikeObjectBase._before_close
903     @retry_method
904     def writelines(self, seq, num_retries=None):
905         for s in seq:
906             self.write(s, num_retries)
907
908     @_FileLikeObjectBase._before_close
909     def truncate(self, size=None):
910         if size is None:
911             size = self._filepos
912         self.arvadosfile.truncate(size)
913         if self._filepos > self.size():
914             self._filepos = self.size()
915
916     @_FileLikeObjectBase._before_close
917     def flush(self):
918         self.arvadosfile.flush()
919
920     def close(self):
921         if not self.closed:
922             self.flush()
923             super(ArvadosFileWriter, self).close()