3198: Store filename/directory name in ArvadosFile/Subcollection object.
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12
13 from .errors import KeepWriteError, AssertionError
14 from .keep import KeepLocator
15 from ._normalize_stream import normalize_stream
16 from ._ranges import locators_and_ranges, replace_range, Range
17 from .retry import retry_method
18
19 MOD = "mod"
20
21 def split(path):
22     """split(path) -> streamname, filename
23
24     Separate the stream name and file name in a /-separated stream path and
25     return a tuple (stream_name, file_name).  If no stream name is available,
26     assume '.'.
27
28     """
29     try:
30         stream_name, file_name = path.rsplit('/', 1)
31     except ValueError:  # No / in string
32         stream_name, file_name = '.', path
33     return stream_name, file_name
34
35 class _FileLikeObjectBase(object):
36     def __init__(self, name, mode):
37         self.name = name
38         self.mode = mode
39         self.closed = False
40
41     @staticmethod
42     def _before_close(orig_func):
43         @functools.wraps(orig_func)
44         def before_close_wrapper(self, *args, **kwargs):
45             if self.closed:
46                 raise ValueError("I/O operation on closed stream file")
47             return orig_func(self, *args, **kwargs)
48         return before_close_wrapper
49
50     def __enter__(self):
51         return self
52
53     def __exit__(self, exc_type, exc_value, traceback):
54         try:
55             self.close()
56         except Exception:
57             if exc_type is None:
58                 raise
59
60     def close(self):
61         self.closed = True
62
63
64 class ArvadosFileReaderBase(_FileLikeObjectBase):
65     def __init__(self, name, mode, num_retries=None):
66         super(ArvadosFileReaderBase, self).__init__(name, mode)
67         self._filepos = 0L
68         self.num_retries = num_retries
69         self._readline_cache = (None, None)
70
71     def __iter__(self):
72         while True:
73             data = self.readline()
74             if not data:
75                 break
76             yield data
77
78     def decompressed_name(self):
79         return re.sub('\.(bz2|gz)$', '', self.name)
80
81     @_FileLikeObjectBase._before_close
82     def seek(self, pos, whence=os.SEEK_SET):
83         if whence == os.SEEK_CUR:
84             pos += self._filepos
85         elif whence == os.SEEK_END:
86             pos += self.size()
87         self._filepos = min(max(pos, 0L), self.size())
88
89     def tell(self):
90         return self._filepos
91
92     @_FileLikeObjectBase._before_close
93     @retry_method
94     def readall(self, size=2**20, num_retries=None):
95         while True:
96             data = self.read(size, num_retries=num_retries)
97             if data == '':
98                 break
99             yield data
100
101     @_FileLikeObjectBase._before_close
102     @retry_method
103     def readline(self, size=float('inf'), num_retries=None):
104         cache_pos, cache_data = self._readline_cache
105         if self.tell() == cache_pos:
106             data = [cache_data]
107         else:
108             data = ['']
109         data_size = len(data[-1])
110         while (data_size < size) and ('\n' not in data[-1]):
111             next_read = self.read(2 ** 20, num_retries=num_retries)
112             if not next_read:
113                 break
114             data.append(next_read)
115             data_size += len(next_read)
116         data = ''.join(data)
117         try:
118             nextline_index = data.index('\n') + 1
119         except ValueError:
120             nextline_index = len(data)
121         nextline_index = min(nextline_index, size)
122         self._readline_cache = (self.tell(), data[nextline_index:])
123         return data[:nextline_index]
124
125     @_FileLikeObjectBase._before_close
126     @retry_method
127     def decompress(self, decompress, size, num_retries=None):
128         for segment in self.readall(size, num_retries):
129             data = decompress(segment)
130             if data:
131                 yield data
132
133     @_FileLikeObjectBase._before_close
134     @retry_method
135     def readall_decompressed(self, size=2**20, num_retries=None):
136         self.seek(0)
137         if self.name.endswith('.bz2'):
138             dc = bz2.BZ2Decompressor()
139             return self.decompress(dc.decompress, size,
140                                    num_retries=num_retries)
141         elif self.name.endswith('.gz'):
142             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
143             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
144                                    size, num_retries=num_retries)
145         else:
146             return self.readall(size, num_retries=num_retries)
147
148     @_FileLikeObjectBase._before_close
149     @retry_method
150     def readlines(self, sizehint=float('inf'), num_retries=None):
151         data = []
152         data_size = 0
153         for s in self.readall(num_retries=num_retries):
154             data.append(s)
155             data_size += len(s)
156             if data_size >= sizehint:
157                 break
158         return ''.join(data).splitlines(True)
159
160     def size(self):
161         raise NotImplementedError()
162
163     def read(self, size, num_retries=None):
164         raise NotImplementedError()
165
166     def readfrom(self, start, size, num_retries=None):
167         raise NotImplementedError()
168
169
170 class StreamFileReader(ArvadosFileReaderBase):
171     class _NameAttribute(str):
172         # The Python file API provides a plain .name attribute.
173         # Older SDK provided a name() method.
174         # This class provides both, for maximum compatibility.
175         def __call__(self):
176             return self
177
178     def __init__(self, stream, segments, name):
179         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
180         self._stream = stream
181         self.segments = segments
182
183     def stream_name(self):
184         return self._stream.name()
185
186     def size(self):
187         n = self.segments[-1]
188         return n.range_start + n.range_size
189
190     @_FileLikeObjectBase._before_close
191     @retry_method
192     def read(self, size, num_retries=None):
193         """Read up to 'size' bytes from the stream, starting at the current file position"""
194         if size == 0:
195             return ''
196
197         data = ''
198         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
199         if available_chunks:
200             lr = available_chunks[0]
201             data = self._stream.readfrom(lr.locator+lr.segment_offset,
202                                           lr.segment_size,
203                                           num_retries=num_retries)
204
205         self._filepos += len(data)
206         return data
207
208     @_FileLikeObjectBase._before_close
209     @retry_method
210     def readfrom(self, start, size, num_retries=None):
211         """Read up to 'size' bytes from the stream, starting at 'start'"""
212         if size == 0:
213             return ''
214
215         data = []
216         for lr in locators_and_ranges(self.segments, start, size):
217             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
218                                               num_retries=num_retries))
219         return ''.join(data)
220
221     def as_manifest(self):
222         segs = []
223         for r in self.segments:
224             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
225         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
226
227
228 def synchronized(orig_func):
229     @functools.wraps(orig_func)
230     def synchronized_wrapper(self, *args, **kwargs):
231         with self.lock:
232             return orig_func(self, *args, **kwargs)
233     return synchronized_wrapper
234
235 class _BufferBlock(object):
236     """A stand-in for a Keep block that is in the process of being written.
237
238     Writers can append to it, get the size, and compute the Keep locator.
239     There are three valid states:
240
241     WRITABLE
242       Can append to block.
243
244     PENDING
245       Block is in the process of being uploaded to Keep, append is an error.
246
247     COMMITTED
248       The block has been written to Keep, its internal buffer has been
249       released, fetching the block will fetch it via keep client (since we
250       discarded the internal copy), and identifiers referring to the BufferBlock
251       can be replaced with the block locator.
252
253     """
254
255     WRITABLE = 0
256     PENDING = 1
257     COMMITTED = 2
258
259     def __init__(self, blockid, starting_capacity, owner):
260         """
261         :blockid:
262           the identifier for this block
263
264         :starting_capacity:
265           the initial buffer capacity
266
267         :owner:
268           ArvadosFile that owns this block
269
270         """
271         self.blockid = blockid
272         self.buffer_block = bytearray(starting_capacity)
273         self.buffer_view = memoryview(self.buffer_block)
274         self.write_pointer = 0
275         self._state = _BufferBlock.WRITABLE
276         self._locator = None
277         self.owner = owner
278         self.lock = threading.Lock()
279
280     @synchronized
281     def append(self, data):
282         """Append some data to the buffer.
283
284         Only valid if the block is in WRITABLE state.  Implements an expanding
285         buffer, doubling capacity as needed to accomdate all the data.
286
287         """
288         if self._state == _BufferBlock.WRITABLE:
289             while (self.write_pointer+len(data)) > len(self.buffer_block):
290                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
291                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
292                 self.buffer_block = new_buffer_block
293                 self.buffer_view = memoryview(self.buffer_block)
294             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
295             self.write_pointer += len(data)
296             self._locator = None
297         else:
298             raise AssertionError("Buffer block is not writable")
299
300     @synchronized
301     def set_state(self, nextstate, loc=None):
302         if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
303             (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
304             self._state = nextstate
305             if self._state == _BufferBlock.COMMITTED:
306                 self._locator = loc
307                 self.buffer_view = None
308                 self.buffer_block = None
309         else:
310             raise AssertionError("Invalid state change from %s to %s" % (self.state, state))
311
312     @synchronized
313     def state(self):
314         return self._state
315
316     def size(self):
317         """The amount of data written to the buffer."""
318         return self.write_pointer
319
320     @synchronized
321     def locator(self):
322         """The Keep locator for this buffer's contents."""
323         if self._locator is None:
324             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
325         return self._locator
326
327     @synchronized
328     def clone(self, new_blockid, owner):
329         if self._state == _BufferBlock.COMMITTED:
330             raise AssertionError("Can only duplicate a writable or pending buffer block")
331         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
332         bufferblock.append(self.buffer_view[0:self.size()])
333         return bufferblock
334
335
336 class NoopLock(object):
337     def __enter__(self):
338         return self
339
340     def __exit__(self, exc_type, exc_value, traceback):
341         pass
342
343     def acquire(self, blocking=False):
344         pass
345
346     def release(self):
347         pass
348
349
350 def must_be_writable(orig_func):
351     @functools.wraps(orig_func)
352     def must_be_writable_wrapper(self, *args, **kwargs):
353         if not self.writable():
354             raise IOError((errno.EROFS, "Collection must be writable."))
355         return orig_func(self, *args, **kwargs)
356     return must_be_writable_wrapper
357
358
359 class _BlockManager(object):
360     """BlockManager handles buffer blocks.
361
362     Also handles background block uploads, and background block prefetch for a
363     Collection of ArvadosFiles.
364
365     """
366     def __init__(self, keep):
367         """keep: KeepClient object to use"""
368         self._keep = keep
369         self._bufferblocks = {}
370         self._put_queue = None
371         self._put_errors = None
372         self._put_threads = None
373         self._prefetch_queue = None
374         self._prefetch_threads = None
375         self.lock = threading.Lock()
376         self.prefetch_enabled = True
377         self.num_put_threads = 2
378         self.num_get_threads = 2
379
380     @synchronized
381     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
382         """Allocate a new, empty bufferblock in WRITABLE state and return it.
383
384         :blockid:
385           optional block identifier, otherwise one will be automatically assigned
386
387         :starting_capacity:
388           optional capacity, otherwise will use default capacity
389
390         :owner:
391           ArvadosFile that owns this block
392
393         """
394         if blockid is None:
395             blockid = "bufferblock%i" % len(self._bufferblocks)
396         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
397         self._bufferblocks[bufferblock.blockid] = bufferblock
398         return bufferblock
399
400     @synchronized
401     def dup_block(self, block, owner):
402         """Create a new bufferblock initialized with the content of an existing bufferblock.
403
404         :block:
405           the buffer block to copy.
406
407         :owner:
408           ArvadosFile that owns the new block
409
410         """
411         new_blockid = "bufferblock%i" % len(self._bufferblocks)
412         bufferblock = block.clone(new_blockid, owner)
413         self._bufferblocks[bufferblock.blockid] = bufferblock
414         return bufferblock
415
416     @synchronized
417     def is_bufferblock(self, locator):
418         return locator in self._bufferblocks
419
420     @synchronized
421     def stop_threads(self):
422         """Shut down and wait for background upload and download threads to finish."""
423
424         if self._put_threads is not None:
425             for t in self._put_threads:
426                 self._put_queue.put(None)
427             for t in self._put_threads:
428                 t.join()
429         self._put_threads = None
430         self._put_queue = None
431         self._put_errors = None
432
433         if self._prefetch_threads is not None:
434             for t in self._prefetch_threads:
435                 self._prefetch_queue.put(None)
436             for t in self._prefetch_threads:
437                 t.join()
438         self._prefetch_threads = None
439         self._prefetch_queue = None
440
441     def commit_bufferblock(self, block):
442         """Initiate a background upload of a bufferblock.
443
444         This will block if the upload queue is at capacity, otherwise it will
445         return immediately.
446
447         """
448
449         def commit_bufferblock_worker(self):
450             """Background uploader thread."""
451
452             while True:
453                 try:
454                     bufferblock = self._put_queue.get()
455                     if bufferblock is None:
456                         return
457                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
458                     bufferblock.set_state(_BufferBlock.COMMITTED, loc)
459
460                 except Exception as e:
461                     self._put_errors.put((bufferblock.locator(), e))
462                 finally:
463                     if self._put_queue is not None:
464                         self._put_queue.task_done()
465
466         with self.lock:
467             if self._put_threads is None:
468                 # Start uploader threads.
469
470                 # If we don't limit the Queue size, the upload queue can quickly
471                 # grow to take up gigabytes of RAM if the writing process is
472                 # generating data more quickly than it can be send to the Keep
473                 # servers.
474                 #
475                 # With two upload threads and a queue size of 2, this means up to 4
476                 # blocks pending.  If they are full 64 MiB blocks, that means up to
477                 # 256 MiB of internal buffering, which is the same size as the
478                 # default download block cache in KeepClient.
479                 self._put_queue = Queue.Queue(maxsize=2)
480                 self._put_errors = Queue.Queue()
481
482                 self._put_threads = []
483                 for i in xrange(0, self.num_put_threads):
484                     thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
485                     self._put_threads.append(thread)
486                     thread.daemon = True
487                     thread.start()
488
489         # Mark the block as PENDING so to disallow any more appends.
490         block.set_state(_BufferBlock.PENDING)
491         self._put_queue.put(block)
492
493     @synchronized
494     def get_bufferblock(self, locator):
495         return self._bufferblocks.get(locator)
496
497     def get_block_contents(self, locator, num_retries, cache_only=False):
498         """Fetch a block.
499
500         First checks to see if the locator is a BufferBlock and return that, if
501         not, passes the request through to KeepClient.get().
502
503         """
504         with self.lock:
505             if locator in self._bufferblocks:
506                 bufferblock = self._bufferblocks[locator]
507                 if bufferblock.state() != _BufferBlock.COMMITTED:
508                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
509                 else:
510                     locator = bufferblock._locator
511         if cache_only:
512             return self._keep.get_from_cache(locator)
513         else:
514             return self._keep.get(locator, num_retries=num_retries)
515
516     def commit_all(self):
517         """Commit all outstanding buffer blocks.
518
519         Unlike commit_bufferblock(), this is a synchronous call, and will not
520         return until all buffer blocks are uploaded.  Raises
521         KeepWriteError() if any blocks failed to upload.
522
523         """
524         with self.lock:
525             items = self._bufferblocks.items()
526
527         for k,v in items:
528             if v.state() == _BufferBlock.WRITABLE:
529                 self.commit_bufferblock(v)
530
531         with self.lock:
532             if self._put_queue is not None:
533                 self._put_queue.join()
534
535                 if not self._put_errors.empty():
536                     err = []
537                     try:
538                         while True:
539                             err.append(self._put_errors.get(False))
540                     except Queue.Empty:
541                         pass
542                     raise KeepWriteError("Error writing some blocks", err, label="block")
543
544     def block_prefetch(self, locator):
545         """Initiate a background download of a block.
546
547         This assumes that the underlying KeepClient implements a block cache,
548         so repeated requests for the same block will not result in repeated
549         downloads (unless the block is evicted from the cache.)  This method
550         does not block.
551
552         """
553
554         if not self.prefetch_enabled:
555             return
556
557         def block_prefetch_worker(self):
558             """The background downloader thread."""
559             while True:
560                 try:
561                     b = self._prefetch_queue.get()
562                     if b is None:
563                         return
564                     self._keep.get(b)
565                 except Exception:
566                     pass
567
568         with self.lock:
569             if locator in self._bufferblocks:
570                 return
571             if self._prefetch_threads is None:
572                 self._prefetch_queue = Queue.Queue()
573                 self._prefetch_threads = []
574                 for i in xrange(0, self.num_get_threads):
575                     thread = threading.Thread(target=block_prefetch_worker, args=(self,))
576                     self._prefetch_threads.append(thread)
577                     thread.daemon = True
578                     thread.start()
579         self._prefetch_queue.put(locator)
580
581
582 class ArvadosFile(object):
583     """Represent a file in a Collection.
584
585     ArvadosFile manages the underlying representation of a file in Keep as a
586     sequence of segments spanning a set of blocks, and implements random
587     read/write access.
588
589     This object may be accessed from multiple threads.
590
591     """
592
593     def __init__(self, parent, name, stream=[], segments=[]):
594         """
595         ArvadosFile constructor.
596
597         :stream:
598           a list of Range objects representing a block stream
599
600         :segments:
601           a list of Range objects representing segments
602         """
603         self.parent = parent
604         self._modified = True
605         self._segments = []
606         self.lock = parent.root_collection().lock
607         for s in segments:
608             self._add_segment(stream, s.locator, s.range_size)
609         self._current_bblock = None
610         self.name = name
611
612     def writable(self):
613         return self.parent.writable()
614
615     @synchronized
616     def segments(self):
617         return copy.copy(self._segments)
618
619     @synchronized
620     def clone(self, new_parent, new_name):
621         """Make a copy of this file."""
622         cp = ArvadosFile(new_parent, new_name)
623         cp.replace_contents(self)
624         return cp
625
626     @must_be_writable
627     @synchronized
628     def replace_contents(self, other):
629         """Replace segments of this file with segments from another `ArvadosFile` object."""
630
631         map_loc = {}
632         self._segments = []
633         for other_segment in other.segments():
634             new_loc = other_segment.locator
635             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
636                 if other_segment.locator not in map_loc:
637                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
638                     if bufferblock.state() != _BufferBlock.WRITABLE:
639                         map_loc[other_segment.locator] = bufferblock.locator()
640                     else:
641                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
642                 new_loc = map_loc[other_segment.locator]
643
644             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
645
646         self._modified = True
647
648     def __eq__(self, other):
649         if other is self:
650             return True
651         if not isinstance(other, ArvadosFile):
652             return False
653
654         othersegs = other.segments()
655         with self.lock:
656             if len(self._segments) != len(othersegs):
657                 return False
658             for i in xrange(0, len(othersegs)):
659                 seg1 = self._segments[i]
660                 seg2 = othersegs[i]
661                 loc1 = seg1.locator
662                 loc2 = seg2.locator
663
664                 if self.parent._my_block_manager().is_bufferblock(loc1):
665                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
666
667                 if other.parent._my_block_manager().is_bufferblock(loc2):
668                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
669
670                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
671                     seg1.range_start != seg2.range_start or
672                     seg1.range_size != seg2.range_size or
673                     seg1.segment_offset != seg2.segment_offset):
674                     return False
675
676         return True
677
678     def __ne__(self, other):
679         return not self.__eq__(other)
680
681     @synchronized
682     def set_unmodified(self):
683         """Clear the modified flag"""
684         self._modified = False
685
686     @synchronized
687     def modified(self):
688         """Test the modified flag"""
689         return self._modified
690
691     @must_be_writable
692     @synchronized
693     def truncate(self, size):
694         """Shrink the size of the file.
695
696         If `size` is less than the size of the file, the file contents after
697         `size` will be discarded.  If `size` is greater than the current size
698         of the file, an IOError will be raised.
699
700         """
701         if size < self.size():
702             new_segs = []
703             for r in self._segments:
704                 range_end = r.range_start+r.range_size
705                 if r.range_start >= size:
706                     # segment is past the trucate size, all done
707                     break
708                 elif size < range_end:
709                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
710                     nr.segment_offset = r.segment_offset
711                     new_segs.append(nr)
712                     break
713                 else:
714                     new_segs.append(r)
715
716             self._segments = new_segs
717             self._modified = True
718         elif size > self.size():
719             raise IOError("truncate() does not support extending the file size")
720
721     def readfrom(self, offset, size, num_retries):
722         """Read upto `size` bytes from the file starting at `offset`."""
723
724         with self.lock:
725             if size == 0 or offset >= self.size():
726                 return ''
727             prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
728             readsegs = locators_and_ranges(self._segments, offset, size)
729
730         for lr in prefetch:
731             self.parent._my_block_manager().block_prefetch(lr.locator)
732
733         data = []
734         for lr in readsegs:
735             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=bool(data))
736             if block:
737                 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
738             else:
739                 break
740         return ''.join(data)
741
742     def _repack_writes(self):
743         """Test if the buffer block has more data than actual segments.
744
745         This happens when a buffered write over-writes a file range written in
746         a previous buffered write.  Re-pack the buffer block for efficiency
747         and to avoid leaking information.
748
749         """
750         segs = self._segments
751
752         # Sum up the segments to get the total bytes of the file referencing
753         # into the buffer block.
754         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
755         write_total = sum([s.range_size for s in bufferblock_segs])
756
757         if write_total < self._current_bblock.size():
758             # There is more data in the buffer block than is actually accounted for by segments, so
759             # re-pack into a new buffer by copying over to a new buffer block.
760             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
761             for t in bufferblock_segs:
762                 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
763                 t.segment_offset = new_bb.size() - t.range_size
764
765             self._current_bblock = new_bb
766
767     @must_be_writable
768     @synchronized
769     def writeto(self, offset, data, num_retries):
770         """Write `data` to the file starting at `offset`.
771
772         This will update existing bytes and/or extend the size of the file as
773         necessary.
774
775         """
776         if len(data) == 0:
777             return
778
779         if offset > self.size():
780             raise ArgumentError("Offset is past the end of the file")
781
782         if len(data) > config.KEEP_BLOCK_SIZE:
783             raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
784
785         self._modified = True
786
787         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
788             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
789
790         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
791             self._repack_writes()
792             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
793                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
794                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
795
796         self._current_bblock.append(data)
797
798         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
799
800     @synchronized
801     def flush(self):
802         if self._current_bblock:
803             self._repack_writes()
804             self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
805         self.parent.notify(MOD, self.parent, self.name, (self, self))
806
807     @must_be_writable
808     @synchronized
809     def add_segment(self, blocks, pos, size):
810         """Add a segment to the end of the file.
811
812         `pos` and `offset` reference a section of the stream described by
813         `blocks` (a list of Range objects)
814
815         """
816         self._add_segment(blocks, pos, size)
817
818     def _add_segment(self, blocks, pos, size):
819         """Internal implementation of add_segment."""
820         self._modified = True
821         for lr in locators_and_ranges(blocks, pos, size):
822             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
823             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
824             self._segments.append(r)
825
826     @synchronized
827     def size(self):
828         """Get the file size."""
829         if self._segments:
830             n = self._segments[-1]
831             return n.range_start + n.range_size
832         else:
833             return 0
834
835     @synchronized
836     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
837         buf = ""
838         filestream = []
839         for segment in self.segments:
840             loc = segment.locator
841             if loc.startswith("bufferblock"):
842                 loc = self._bufferblocks[loc].calculate_locator()
843             if portable_locators:
844                 loc = KeepLocator(loc).stripped()
845             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
846                                  segment.segment_offset, segment.range_size))
847         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
848         buf += "\n"
849         return buf
850
851
852 class ArvadosFileReader(ArvadosFileReaderBase):
853     """Wraps ArvadosFile in a file-like object supporting reading only.
854
855     Be aware that this class is NOT thread safe as there is no locking around
856     updating file pointer.
857
858     """
859
860     def __init__(self, arvadosfile,  mode="r", num_retries=None):
861         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode, num_retries=num_retries)
862         self.arvadosfile = arvadosfile
863
864     def size(self):
865         return self.arvadosfile.size()
866
867     def stream_name(self):
868         return self.arvadosfile.parent.stream_name()
869
870     @_FileLikeObjectBase._before_close
871     @retry_method
872     def read(self, size, num_retries=None):
873         """Read up to `size` bytes from the stream, starting at the current file position."""
874         data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
875         self._filepos += len(data)
876         return data
877
878     @_FileLikeObjectBase._before_close
879     @retry_method
880     def readfrom(self, offset, size, num_retries=None):
881         """Read up to `size` bytes from the stream, starting at the current file position."""
882         return self.arvadosfile.readfrom(offset, size, num_retries)
883
884     def flush(self):
885         pass
886
887
888 class ArvadosFileWriter(ArvadosFileReader):
889     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
890
891     Be aware that this class is NOT thread safe as there is no locking around
892     updating file pointer.
893
894     """
895
896     def __init__(self, arvadosfile, mode, num_retries=None):
897         super(ArvadosFileWriter, self).__init__(arvadosfile, mode, num_retries=num_retries)
898
899     @_FileLikeObjectBase._before_close
900     @retry_method
901     def write(self, data, num_retries=None):
902         if self.mode[0] == "a":
903             self.arvadosfile.writeto(self.size(), data, num_retries)
904         else:
905             self.arvadosfile.writeto(self._filepos, data, num_retries)
906             self._filepos += len(data)
907
908     @_FileLikeObjectBase._before_close
909     @retry_method
910     def writelines(self, seq, num_retries=None):
911         for s in seq:
912             self.write(s, num_retries)
913
914     @_FileLikeObjectBase._before_close
915     def truncate(self, size=None):
916         if size is None:
917             size = self._filepos
918         self.arvadosfile.truncate(size)
919         if self._filepos > self.size():
920             self._filepos = self.size()
921
922     @_FileLikeObjectBase._before_close
923     def flush(self):
924         self.arvadosfile.flush()
925
926     def close(self):
927         if not self.closed:
928             self.flush()
929             super(ArvadosFileWriter, self).close()