4823: Refactoring. ReadOnly Collection is now CollectionReader, replacing old
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 from ._ranges import locators_and_ranges, replace_range
6 from arvados.retry import retry_method
7 import config
8 import hashlib
9 import hashlib
10 import threading
11 import Queue
12 import copy
13 import errno
14 from .errors import KeepWriteError, AssertionError
15 from .keep import KeepLocator
16 from _normalize_stream import normalize_stream
17
18 def split(path):
19     """Separate the stream name and file name in a /-separated stream path and
20     return a tuple (stream_name, file_name).
21
22     If no stream name is available, assume '.'.
23
24     """
25     try:
26         stream_name, file_name = path.rsplit('/', 1)
27     except ValueError:  # No / in string
28         stream_name, file_name = '.', path
29     return stream_name, file_name
30
31 class _FileLikeObjectBase(object):
32     def __init__(self, name, mode):
33         self.name = name
34         self.mode = mode
35         self.closed = False
36
37     @staticmethod
38     def _before_close(orig_func):
39         @functools.wraps(orig_func)
40         def before_close_wrapper(self, *args, **kwargs):
41             if self.closed:
42                 raise ValueError("I/O operation on closed stream file")
43             return orig_func(self, *args, **kwargs)
44         return before_close_wrapper
45
46     def __enter__(self):
47         return self
48
49     def __exit__(self, exc_type, exc_value, traceback):
50         try:
51             self.close()
52         except Exception:
53             if exc_type is None:
54                 raise
55
56     def close(self):
57         self.closed = True
58
59
60 class ArvadosFileReaderBase(_FileLikeObjectBase):
61     class _NameAttribute(str):
62         # The Python file API provides a plain .name attribute.
63         # Older SDK provided a name() method.
64         # This class provides both, for maximum compatibility.
65         def __call__(self):
66             return self
67
68     def __init__(self, name, mode, num_retries=None):
69         super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode)
70         self._filepos = 0L
71         self.num_retries = num_retries
72         self._readline_cache = (None, None)
73
74     def __iter__(self):
75         while True:
76             data = self.readline()
77             if not data:
78                 break
79             yield data
80
81     def decompressed_name(self):
82         return re.sub('\.(bz2|gz)$', '', self.name)
83
84     @FileLikeObjectBase._before_close
85     def seek(self, pos, whence=os.SEEK_CUR):
86         if whence == os.SEEK_CUR:
87             pos += self._filepos
88         elif whence == os.SEEK_END:
89             pos += self.size()
90         self._filepos = min(max(pos, 0L), self.size())
91
92     def tell(self):
93         return self._filepos
94
95     @FileLikeObjectBase._before_close
96     @retry_method
97     def readall(self, size=2**20, num_retries=None):
98         while True:
99             data = self.read(size, num_retries=num_retries)
100             if data == '':
101                 break
102             yield data
103
104     @FileLikeObjectBase._before_close
105     @retry_method
106     def readline(self, size=float('inf'), num_retries=None):
107         cache_pos, cache_data = self._readline_cache
108         if self.tell() == cache_pos:
109             data = [cache_data]
110         else:
111             data = ['']
112         data_size = len(data[-1])
113         while (data_size < size) and ('\n' not in data[-1]):
114             next_read = self.read(2 ** 20, num_retries=num_retries)
115             if not next_read:
116                 break
117             data.append(next_read)
118             data_size += len(next_read)
119         data = ''.join(data)
120         try:
121             nextline_index = data.index('\n') + 1
122         except ValueError:
123             nextline_index = len(data)
124         nextline_index = min(nextline_index, size)
125         self._readline_cache = (self.tell(), data[nextline_index:])
126         return data[:nextline_index]
127
128     @FileLikeObjectBase._before_close
129     @retry_method
130     def decompress(self, decompress, size, num_retries=None):
131         for segment in self.readall(size, num_retries):
132             data = decompress(segment)
133             if data:
134                 yield data
135
136     @FileLikeObjectBase._before_close
137     @retry_method
138     def readall_decompressed(self, size=2**20, num_retries=None):
139         self.seek(0)
140         if self.name.endswith('.bz2'):
141             dc = bz2.BZ2Decompressor()
142             return self.decompress(dc.decompress, size,
143                                    num_retries=num_retries)
144         elif self.name.endswith('.gz'):
145             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
146             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
147                                    size, num_retries=num_retries)
148         else:
149             return self.readall(size, num_retries=num_retries)
150
151     @FileLikeObjectBase._before_close
152     @retry_method
153     def readlines(self, sizehint=float('inf'), num_retries=None):
154         data = []
155         data_size = 0
156         for s in self.readall(num_retries=num_retries):
157             data.append(s)
158             data_size += len(s)
159             if data_size >= sizehint:
160                 break
161         return ''.join(data).splitlines(True)
162
163     def size(self):
164         raise NotImplementedError()
165
166     def read(self, size, num_retries=None):
167         raise NotImplementedError()
168
169     def readfrom(self, start, size, num_retries=None):
170         raise NotImplementedError()
171
172
173 class StreamFileReader(ArvadosFileReaderBase):
174     def __init__(self, stream, segments, name):
175         super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries)
176         self._stream = stream
177         self.segments = segments
178
179     def stream_name(self):
180         return self._stream.name()
181
182     def size(self):
183         n = self.segments[-1]
184         return n.range_start + n.range_size
185
186     @FileLikeObjectBase._before_close
187     @retry_method
188     def read(self, size, num_retries=None):
189         """Read up to 'size' bytes from the stream, starting at the current file position"""
190         if size == 0:
191             return ''
192
193         data = ''
194         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
195         if available_chunks:
196             lr = available_chunks[0]
197             data = self._stream._readfrom(lr.locator+lr.segment_offset,
198                                           lr.segment_size,
199                                           num_retries=num_retries)
200
201         self._filepos += len(data)
202         return data
203
204     @FileLikeObjectBase._before_close
205     @retry_method
206     def readfrom(self, start, size, num_retries=None):
207         """Read up to 'size' bytes from the stream, starting at 'start'"""
208         if size == 0:
209             return ''
210
211         data = []
212         for lr in locators_and_ranges(self.segments, start, size):
213             data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size,
214                                               num_retries=num_retries))
215         return ''.join(data)
216
217     def as_manifest(self):
218         segs = []
219         for r in self.segments:
220             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
221         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
222
223
224 def synchronized(orig_func):
225     @functools.wraps(orig_func)
226     def synchronized_wrapper(self, *args, **kwargs):
227         with self.lock:
228             return orig_func(self, *args, **kwargs)
229     return synchronized_wrapper
230
231 class _BufferBlock(object):
232     """A BufferBlock is a stand-in for a Keep block that is in the process of being
233     written.
234
235     Writers can append to it, get the size, and compute the Keep locator.
236     There are three valid states:
237
238     WRITABLE
239       Can append to block.
240
241     PENDING
242       Block is in the process of being uploaded to Keep, append is an error.
243
244     COMMITTED
245       The block has been written to Keep, its internal buffer has been
246       released, fetching the block will fetch it via keep client (since we
247       discarded the internal copy), and identifiers referring to the BufferBlock
248       can be replaced with the block locator.
249
250     """
251
252     WRITABLE = 0
253     PENDING = 1
254     COMMITTED = 2
255
256     def __init__(self, blockid, starting_capacity, owner):
257         """
258         :blockid:
259           the identifier for this block
260
261         :starting_capacity:
262           the initial buffer capacity
263
264         :owner:
265           ArvadosFile that owns this block
266
267         """
268         self.blockid = blockid
269         self.buffer_block = bytearray(starting_capacity)
270         self.buffer_view = memoryview(self.buffer_block)
271         self.write_pointer = 0
272         self._state = _BufferBlock.WRITABLE
273         self._locator = None
274         self.owner = owner
275         self.lock = threading.Lock()
276
277     @synchronized
278     def append(self, data):
279         """Append some data to the buffer.
280
281         Only valid if the block is in WRITABLE state.  Implements an expanding
282         buffer, doubling capacity as needed to accomdate all the data.
283
284         """
285         if self._state == _BufferBlock.WRITABLE:
286             while (self.write_pointer+len(data)) > len(self.buffer_block):
287                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
288                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
289                 self.buffer_block = new_buffer_block
290                 self.buffer_view = memoryview(self.buffer_block)
291             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
292             self.write_pointer += len(data)
293             self._locator = None
294         else:
295             raise AssertionError("Buffer block is not writable")
296
297     @synchronized
298     def set_state(self, nextstate, loc=None):
299         if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
300             (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
301             self._state = nextstate
302             if self._state == _BufferBlock.COMMITTED:
303                     bufferblock._locator = loc
304                     bufferblock.buffer_view = None
305                     bufferblock.buffer_block = None
306         else:
307             raise AssertionError("Invalid state change from %s to %s" % (self.state, state))
308
309     @synchronized
310     def state(self):
311         return self._state
312
313     def size(self):
314         """The amount of data written to the buffer."""
315         return self.write_pointer
316
317     @synchronized
318     def locator(self):
319         """The Keep locator for this buffer's contents."""
320         if self._locator is None:
321             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
322         return self._locator
323
324
325 class NoopLock(object):
326     def __enter__(self):
327         return self
328
329     def __exit__(self, exc_type, exc_value, traceback):
330         pass
331
332     def acquire(self, blocking=False):
333         pass
334
335     def release(self):
336         pass
337
338 SYNC_READONLY = 1
339 SYNC_EXPLICIT = 2
340 SYNC_LIVE = 3
341
342 def must_be_writable(orig_func):
343     @functools.wraps(orig_func)
344     def must_be_writable_wrapper(self, *args, **kwargs):
345         if self.sync_mode() == SYNC_READONLY:
346             raise IOError((errno.EROFS, "Collection is read only"))
347         return orig_func(self, *args, **kwargs)
348     return must_be_writable_wrapper
349
350
351 class _BlockManager(object):
352     """BlockManager handles buffer blocks, background block uploads, and background
353     block prefetch for a Collection of ArvadosFiles.
354
355     """
356     def __init__(self, keep):
357         """keep: KeepClient object to use"""
358         self._keep = keep
359         self._bufferblocks = {}
360         self._put_queue = None
361         self._put_errors = None
362         self._put_threads = None
363         self._prefetch_queue = None
364         self._prefetch_threads = None
365         self.lock = threading.Lock()
366         self.prefetch_enabled = True
367         self.num_put_threads = 2
368         self.num_get_threads = 2
369
370     @synchronized
371     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
372         """Allocate a new, empty bufferblock in WRITABLE state and return it.
373
374         :blockid:
375           optional block identifier, otherwise one will be automatically assigned
376
377         :starting_capacity:
378           optional capacity, otherwise will use default capacity
379
380         :owner:
381           ArvadosFile that owns this block
382
383         """
384         if blockid is None:
385             blockid = "bufferblock%i" % len(self._bufferblocks)
386         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
387         self._bufferblocks[bufferblock.blockid] = bufferblock
388         return bufferblock
389
390     @synchronized
391     def dup_block(self, block, owner):
392         """Create a new bufferblock in WRITABLE state, initialized with the content of
393         an existing bufferblock.
394
395         :block:
396           the buffer block to copy.
397
398         :owner:
399           ArvadosFile that owns the new block
400
401         """
402         new_blockid = "bufferblock%i" % len(self._bufferblocks)
403         with block.lock:
404             if block._state == _BufferBlock.COMMITTED:
405                 raise AssertionError("Can only duplicate a writable or pending buffer block")
406
407             bufferblock = _BufferBlock(new_blockid, block.size(), owner)
408             bufferblock.append(block.buffer_view[0:block.size()])
409         self._bufferblocks[bufferblock.blockid] = bufferblock
410         return bufferblock
411
412     @synchronized
413     def is_bufferblock(self, locator):
414         return locator in self._bufferblocks
415
416     @synchronized
417     def stop_threads(self):
418         """Shut down and wait for background upload and download threads to finish."""
419
420         if self._put_threads is not None:
421             for t in self._put_threads:
422                 self._put_queue.put(None)
423             for t in self._put_threads:
424                 t.join()
425         self._put_threads = None
426         self._put_queue = None
427         self._put_errors = None
428
429         if self._prefetch_threads is not None:
430             for t in self._prefetch_threads:
431                 self._prefetch_queue.put(None)
432             for t in self._prefetch_threads:
433                 t.join()
434         self._prefetch_threads = None
435         self._prefetch_queue = None
436
437     def commit_bufferblock(self, block):
438         """Initiate a background upload of a bufferblock.
439
440         This will block if the upload queue is at capacity, otherwise it will
441         return immediately.
442
443         """
444
445         def commit_bufferblock_worker(self):
446             """Background uploader thread."""
447
448             while True:
449                 try:
450                     bufferblock = self._put_queue.get()
451                     if bufferblock is None:
452                         return
453                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
454                     bufferblock.set_state(_BufferBlock.COMMITTED, loc)
455
456                 except Exception as e:
457                     print e
458                     self._put_errors.put((bufferblock.locator(), e))
459                 finally:
460                     if self._put_queue is not None:
461                         self._put_queue.task_done()
462
463         with self.lock:
464             if self._put_threads is None:
465                 # Start uploader threads.
466
467                 # If we don't limit the Queue size, the upload queue can quickly
468                 # grow to take up gigabytes of RAM if the writing process is
469                 # generating data more quickly than it can be send to the Keep
470                 # servers.
471                 #
472                 # With two upload threads and a queue size of 2, this means up to 4
473                 # blocks pending.  If they are full 64 MiB blocks, that means up to
474                 # 256 MiB of internal buffering, which is the same size as the
475                 # default download block cache in KeepClient.
476                 self._put_queue = Queue.Queue(maxsize=2)
477                 self._put_errors = Queue.Queue()
478
479                 self._put_threads = []
480                 for i in xrange(0, self.num_put_threads):
481                     thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
482                     self._put_threads.append(thread)
483                     thread.daemon = True
484                     thread.start()
485
486         # Mark the block as PENDING so to disallow any more appends.
487         block.set_state(_BufferBlock.PENDING)
488         self._put_queue.put(block)
489
490     @synchronized
491     def get_bufferblock(self, locator):
492         return self._bufferblocks.get(locator)
493
494     def get_block_contents(self, locator, num_retries, cache_only=False):
495         """Fetch a block.
496
497         First checks to see if the locator is a BufferBlock and return that, if
498         not, passes the request through to KeepClient.get().
499
500         """
501         with self.lock:
502             if locator in self._bufferblocks:
503                 bufferblock = self._bufferblocks[locator]
504                 if bufferblock.state() != _BufferBlock.COMMITTED:
505                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
506                 else:
507                     locator = bufferblock._locator
508         if cache_only:
509             return self._keep.get_from_cache(locator)
510         else:
511             return self._keep.get(locator, num_retries=num_retries)
512
513     def commit_all(self):
514         """Commit all outstanding buffer blocks.
515
516         Unlike commit_bufferblock(), this is a synchronous call, and will not
517         return until all buffer blocks are uploaded.  Raises
518         KeepWriteError() if any blocks failed to upload.
519
520         """
521         with self.lock:
522             items = self._bufferblocks.items()
523
524         for k,v in items:
525             if v.state() == _BufferBlock.WRITABLE:
526                 self.commit_bufferblock(v)
527
528         with self.lock:
529             if self._put_queue is not None:
530                 self._put_queue.join()
531
532                 if not self._put_errors.empty():
533                     err = []
534                     try:
535                         while True:
536                             err.append(self._put_errors.get(False))
537                     except Queue.Empty:
538                         pass
539                     raise KeepWriteError("Error writing some blocks", err)
540
541     def block_prefetch(self, locator):
542         """Initiate a background download of a block.
543
544         This assumes that the underlying KeepClient implements a block cache,
545         so repeated requests for the same block will not result in repeated
546         downloads (unless the block is evicted from the cache.)  This method
547         does not block.
548
549         """
550
551         if not self.prefetch_enabled:
552             return
553
554         def block_prefetch_worker(self):
555             """The background downloader thread."""
556             while True:
557                 try:
558                     b = self._prefetch_queue.get()
559                     if b is None:
560                         return
561                     self._keep.get(b)
562                 except:
563                     pass
564
565         with self.lock:
566             if locator in self._bufferblocks:
567                 return
568             if self._prefetch_threads is None:
569                 self._prefetch_queue = Queue.Queue()
570                 self._prefetch_threads = []
571                 for i in xrange(0, self.num_get_threads):
572                     thread = threading.Thread(target=block_prefetch_worker, args=(self,))
573                     self._prefetch_threads.append(thread)
574                     thread.daemon = True
575                     thread.start()
576         self._prefetch_queue.put(locator)
577
578
579 class ArvadosFile(object):
580     """ArvadosFile manages the underlying representation of a file in Keep as a
581     sequence of segments spanning a set of blocks, and implements random
582     read/write access.
583
584     This object may be accessed from multiple threads.
585
586     """
587
588     def __init__(self, parent, stream=[], segments=[]):
589         """
590         ArvadosFile constructor.
591
592         :stream:
593           a list of Range objects representing a block stream
594
595         :segments:
596           a list of Range objects representing segments
597         """
598         self.parent = parent
599         self._modified = True
600         self._segments = []
601         self.lock = parent.root_collection().lock
602         for s in segments:
603             self._add_segment(stream, s.locator, s.range_size)
604         self._current_bblock = None
605
606     def sync_mode(self):
607         return self.parent.sync_mode()
608
609     @synchronized
610     def segments(self):
611         return copy.copy(self._segments)
612
613     @synchronized
614     def clone(self, new_parent):
615         """Make a copy of this file."""
616         cp = ArvadosFile(new_parent)
617         cp.replace_contents(self)
618         return cp
619
620     @must_be_writable
621     @synchronized
622     def replace_contents(self, other):
623         """Replace segments of this file with segments from another `ArvadosFile` object."""
624
625         map_loc = {}
626         self._segments = []
627         for other_segment in other.segments():
628             new_loc = other_segment.locator
629             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
630                 if other_segment.locator not in map_loc:
631                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
632                     if bufferblock.state() != _BufferBlock.WRITABLE:
633                         map_loc[other_segment.locator] = bufferblock.locator()
634                     else:
635                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
636                 new_loc = map_loc[other_segment.locator]
637
638             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
639
640         self._modified = True
641
642     def __eq__(self, other):
643         if other is self:
644             return True
645         if not isinstance(other, ArvadosFile):
646             return False
647
648         othersegs = other.segments()
649         with self.lock:
650             if len(self._segments) != len(othersegs):
651                 return False
652             for i in xrange(0, len(othersegs)):
653                 seg1 = self._segments[i]
654                 seg2 = othersegs[i]
655                 loc1 = seg1.locator
656                 loc2 = seg2.locator
657
658                 if self.parent._my_block_manager().is_bufferblock(loc1):
659                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
660
661                 if other.parent._my_block_manager().is_bufferblock(loc2):
662                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
663
664                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
665                     seg1.range_start != seg2.range_start or
666                     seg1.range_size != seg2.range_size or
667                     seg1.segment_offset != seg2.segment_offset):
668                     return False
669
670         return True
671
672     def __ne__(self, other):
673         return not self.__eq__(other)
674
675     @synchronized
676     def set_unmodified(self):
677         """Clear the modified flag"""
678         self._modified = False
679
680     @synchronized
681     def modified(self):
682         """Test the modified flag"""
683         return self._modified
684
685     @must_be_writable
686     @synchronized
687     def truncate(self, size):
688         """Shrink the size of the file.
689
690         If `size` is less than the size of the file, the file contents after
691         `size` will be discarded.  If `size` is greater than the current size
692         of the file, an IOError will be raised.
693
694         """
695         if size < self.size():
696             new_segs = []
697             for r in self._segments:
698                 range_end = r.range_start+r.range_size
699                 if r.range_start >= size:
700                     # segment is past the trucate size, all done
701                     break
702                 elif size < range_end:
703                     nr = Range(r.locator, r.range_start, size - r.range_start)
704                     nr.segment_offset = r.segment_offset
705                     new_segs.append(nr)
706                     break
707                 else:
708                     new_segs.append(r)
709
710             self._segments = new_segs
711             self._modified = True
712         elif size > self.size():
713             raise IOError("truncate() does not support extending the file size")
714
715     def readfrom(self, offset, size, num_retries):
716         """Read upto `size` bytes from the file starting at `offset`."""
717
718         with self.lock:
719             if size == 0 or offset >= self.size():
720                 return ''
721             prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
722             readsegs = locators_and_ranges(self._segments, offset, size)
723
724         for lr in prefetch:
725             self.parent._my_block_manager().block_prefetch(lr.locator)
726
727         data = []
728         for lr in readsegs:
729             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=bool(data))
730             if block:
731                 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
732             else:
733                 break
734         return ''.join(data)
735
736     def _repack_writes(self):
737         """Test if the buffer block has more data than is referenced by actual
738         segments.
739
740         This happens when a buffered write over-writes a file range written in
741         a previous buffered write.  Re-pack the buffer block for efficiency
742         and to avoid leaking information.
743
744         """
745         segs = self._segments
746
747         # Sum up the segments to get the total bytes of the file referencing
748         # into the buffer block.
749         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
750         write_total = sum([s.range_size for s in bufferblock_segs])
751
752         if write_total < self._current_bblock.size():
753             # There is more data in the buffer block than is actually accounted for by segments, so
754             # re-pack into a new buffer by copying over to a new buffer block.
755             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
756             for t in bufferblock_segs:
757                 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
758                 t.segment_offset = new_bb.size() - t.range_size
759
760             self._current_bblock = new_bb
761
762     @must_be_writable
763     @synchronized
764     def writeto(self, offset, data, num_retries):
765         """Write `data` to the file starting at `offset`.
766
767         This will update existing bytes and/or extend the size of the file as
768         necessary.
769
770         """
771         if len(data) == 0:
772             return
773
774         if offset > self.size():
775             raise ArgumentError("Offset is past the end of the file")
776
777         if len(data) > config.KEEP_BLOCK_SIZE:
778             raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
779
780         self._modified = True
781
782         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
783             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
784
785         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
786             self._repack_writes()
787             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
788                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
789                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
790
791         self._current_bblock.append(data)
792
793         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
794
795     @must_be_writable
796     @synchronized
797     def add_segment(self, blocks, pos, size):
798         """Add a segment to the end of the file, with `pos` and `offset` referencing a
799         section of the stream described by `blocks` (a list of Range objects)
800
801         """
802         self._add_segment(blocks, pos, size)
803
804     def _add_segment(self, blocks, pos, size):
805         """Internal implementation of add_segment."""
806         self._modified = True
807         for lr in locators_and_ranges(blocks, pos, size):
808             last = self._segments[-1] if self._segments else Range(0, 0, 0)
809             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
810             self._segments.append(r)
811
812     @synchronized
813     def size(self):
814         """Get the file size."""
815         if self._segments:
816             n = self._segments[-1]
817             return n.range_start + n.range_size
818         else:
819             return 0
820
821
822     @synchronized
823     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
824         item = self
825         filestream = []
826         for segment in item.segments:
827             loc = segment.locator
828             if loc.startswith("bufferblock"):
829                 loc = item._bufferblocks[loc].calculate_locator()
830             if portable_locators:
831                 loc = KeepLocator(loc).stripped()
832             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
833                                  segment.segment_offset, segment.range_size))
834         stream[stream_name] = filestream
835         buf += ' '.join(normalize_stream(stream_name, stream))
836         buf += "\n"
837         return buf
838
839
840 class ArvadosFileReader(ArvadosFileReaderBase):
841     """Wraps ArvadosFile in a file-like object supporting reading only.
842
843     Be aware that this class is NOT thread safe as there is no locking around
844     updating file pointer.
845
846     """
847
848     def __init__(self, arvadosfile, name, mode="r", num_retries=None):
849         super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
850         self.arvadosfile = arvadosfile
851
852     def size(self):
853         return self.arvadosfile.size()
854
855     @FileLikeObjectBase._before_close
856     @retry_method
857     def read(self, size, num_retries=None):
858         """Read up to `size` bytes from the stream, starting at the current file position."""
859         data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
860         self._filepos += len(data)
861         return data
862
863     @FileLikeObjectBase._before_close
864     @retry_method
865     def readfrom(self, offset, size, num_retries=None):
866         """Read up to `size` bytes from the stream, starting at the current file position."""
867         return self.arvadosfile.readfrom(offset, size, num_retries)
868
869     def flush(self):
870         pass
871
872
873 class ArvadosFileWriter(ArvadosFileReader):
874     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
875
876     Be aware that this class is NOT thread safe as there is no locking around
877     updating file pointer.
878
879     """
880
881     def __init__(self, arvadosfile, name, mode, num_retries=None):
882         super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
883
884     @FileLikeObjectBase._before_close
885     @retry_method
886     def write(self, data, num_retries=None):
887         if self.mode[0] == "a":
888             self.arvadosfile.writeto(self.size(), data, num_retries)
889         else:
890             self.arvadosfile.writeto(self._filepos, data, num_retries)
891             self._filepos += len(data)
892
893     @FileLikeObjectBase._before_close
894     @retry_method
895     def writelines(self, seq, num_retries=None):
896         for s in seq:
897             self.write(s, num_retries)
898
899     def truncate(self, size=None):
900         if size is None:
901             size = self._filepos
902         self.arvadosfile.truncate(size)
903         if self._filepos > self.size():
904             self._filepos = self.size()
905
906     def close(self):
907         if self.arvadosfile.parent.sync_mode() == SYNC_LIVE:
908             self.arvadosfile.parent.root_collection().save()