Merge branch '5476-connect-timeout-scale' closes #5476
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12
13 from .errors import KeepWriteError, AssertionError
14 from .keep import KeepLocator
15 from ._normalize_stream import normalize_stream
16 from ._ranges import locators_and_ranges, replace_range, Range
17 from .retry import retry_method
18
19 def split(path):
20     """split(path) -> streamname, filename
21
22     Separate the stream name and file name in a /-separated stream path and
23     return a tuple (stream_name, file_name).  If no stream name is available,
24     assume '.'.
25
26     """
27     try:
28         stream_name, file_name = path.rsplit('/', 1)
29     except ValueError:  # No / in string
30         stream_name, file_name = '.', path
31     return stream_name, file_name
32
33 class _FileLikeObjectBase(object):
34     def __init__(self, name, mode):
35         self.name = name
36         self.mode = mode
37         self.closed = False
38
39     @staticmethod
40     def _before_close(orig_func):
41         @functools.wraps(orig_func)
42         def before_close_wrapper(self, *args, **kwargs):
43             if self.closed:
44                 raise ValueError("I/O operation on closed stream file")
45             return orig_func(self, *args, **kwargs)
46         return before_close_wrapper
47
48     def __enter__(self):
49         return self
50
51     def __exit__(self, exc_type, exc_value, traceback):
52         try:
53             self.close()
54         except Exception:
55             if exc_type is None:
56                 raise
57
58     def close(self):
59         self.closed = True
60
61
62 class ArvadosFileReaderBase(_FileLikeObjectBase):
63     def __init__(self, name, mode, num_retries=None):
64         super(ArvadosFileReaderBase, self).__init__(name, mode)
65         self._filepos = 0L
66         self.num_retries = num_retries
67         self._readline_cache = (None, None)
68
69     def __iter__(self):
70         while True:
71             data = self.readline()
72             if not data:
73                 break
74             yield data
75
76     def decompressed_name(self):
77         return re.sub('\.(bz2|gz)$', '', self.name)
78
79     @_FileLikeObjectBase._before_close
80     def seek(self, pos, whence=os.SEEK_CUR):
81         if whence == os.SEEK_CUR:
82             pos += self._filepos
83         elif whence == os.SEEK_END:
84             pos += self.size()
85         self._filepos = min(max(pos, 0L), self.size())
86
87     def tell(self):
88         return self._filepos
89
90     @_FileLikeObjectBase._before_close
91     @retry_method
92     def readall(self, size=2**20, num_retries=None):
93         while True:
94             data = self.read(size, num_retries=num_retries)
95             if data == '':
96                 break
97             yield data
98
99     @_FileLikeObjectBase._before_close
100     @retry_method
101     def readline(self, size=float('inf'), num_retries=None):
102         cache_pos, cache_data = self._readline_cache
103         if self.tell() == cache_pos:
104             data = [cache_data]
105         else:
106             data = ['']
107         data_size = len(data[-1])
108         while (data_size < size) and ('\n' not in data[-1]):
109             next_read = self.read(2 ** 20, num_retries=num_retries)
110             if not next_read:
111                 break
112             data.append(next_read)
113             data_size += len(next_read)
114         data = ''.join(data)
115         try:
116             nextline_index = data.index('\n') + 1
117         except ValueError:
118             nextline_index = len(data)
119         nextline_index = min(nextline_index, size)
120         self._readline_cache = (self.tell(), data[nextline_index:])
121         return data[:nextline_index]
122
123     @_FileLikeObjectBase._before_close
124     @retry_method
125     def decompress(self, decompress, size, num_retries=None):
126         for segment in self.readall(size, num_retries):
127             data = decompress(segment)
128             if data:
129                 yield data
130
131     @_FileLikeObjectBase._before_close
132     @retry_method
133     def readall_decompressed(self, size=2**20, num_retries=None):
134         self.seek(0)
135         if self.name.endswith('.bz2'):
136             dc = bz2.BZ2Decompressor()
137             return self.decompress(dc.decompress, size,
138                                    num_retries=num_retries)
139         elif self.name.endswith('.gz'):
140             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
141             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
142                                    size, num_retries=num_retries)
143         else:
144             return self.readall(size, num_retries=num_retries)
145
146     @_FileLikeObjectBase._before_close
147     @retry_method
148     def readlines(self, sizehint=float('inf'), num_retries=None):
149         data = []
150         data_size = 0
151         for s in self.readall(num_retries=num_retries):
152             data.append(s)
153             data_size += len(s)
154             if data_size >= sizehint:
155                 break
156         return ''.join(data).splitlines(True)
157
158     def size(self):
159         raise NotImplementedError()
160
161     def read(self, size, num_retries=None):
162         raise NotImplementedError()
163
164     def readfrom(self, start, size, num_retries=None):
165         raise NotImplementedError()
166
167
168 class StreamFileReader(ArvadosFileReaderBase):
169     class _NameAttribute(str):
170         # The Python file API provides a plain .name attribute.
171         # Older SDK provided a name() method.
172         # This class provides both, for maximum compatibility.
173         def __call__(self):
174             return self
175
176     def __init__(self, stream, segments, name):
177         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
178         self._stream = stream
179         self.segments = segments
180
181     def stream_name(self):
182         return self._stream.name()
183
184     def size(self):
185         n = self.segments[-1]
186         return n.range_start + n.range_size
187
188     @_FileLikeObjectBase._before_close
189     @retry_method
190     def read(self, size, num_retries=None):
191         """Read up to 'size' bytes from the stream, starting at the current file position"""
192         if size == 0:
193             return ''
194
195         data = ''
196         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
197         if available_chunks:
198             lr = available_chunks[0]
199             data = self._stream.readfrom(lr.locator+lr.segment_offset,
200                                           lr.segment_size,
201                                           num_retries=num_retries)
202
203         self._filepos += len(data)
204         return data
205
206     @_FileLikeObjectBase._before_close
207     @retry_method
208     def readfrom(self, start, size, num_retries=None):
209         """Read up to 'size' bytes from the stream, starting at 'start'"""
210         if size == 0:
211             return ''
212
213         data = []
214         for lr in locators_and_ranges(self.segments, start, size):
215             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
216                                               num_retries=num_retries))
217         return ''.join(data)
218
219     def as_manifest(self):
220         segs = []
221         for r in self.segments:
222             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
223         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
224
225
226 def synchronized(orig_func):
227     @functools.wraps(orig_func)
228     def synchronized_wrapper(self, *args, **kwargs):
229         with self.lock:
230             return orig_func(self, *args, **kwargs)
231     return synchronized_wrapper
232
233 class _BufferBlock(object):
234     """A stand-in for a Keep block that is in the process of being written.
235
236     Writers can append to it, get the size, and compute the Keep locator.
237     There are three valid states:
238
239     WRITABLE
240       Can append to block.
241
242     PENDING
243       Block is in the process of being uploaded to Keep, append is an error.
244
245     COMMITTED
246       The block has been written to Keep, its internal buffer has been
247       released, fetching the block will fetch it via keep client (since we
248       discarded the internal copy), and identifiers referring to the BufferBlock
249       can be replaced with the block locator.
250
251     """
252
253     WRITABLE = 0
254     PENDING = 1
255     COMMITTED = 2
256
257     def __init__(self, blockid, starting_capacity, owner):
258         """
259         :blockid:
260           the identifier for this block
261
262         :starting_capacity:
263           the initial buffer capacity
264
265         :owner:
266           ArvadosFile that owns this block
267
268         """
269         self.blockid = blockid
270         self.buffer_block = bytearray(starting_capacity)
271         self.buffer_view = memoryview(self.buffer_block)
272         self.write_pointer = 0
273         self._state = _BufferBlock.WRITABLE
274         self._locator = None
275         self.owner = owner
276         self.lock = threading.Lock()
277
278     @synchronized
279     def append(self, data):
280         """Append some data to the buffer.
281
282         Only valid if the block is in WRITABLE state.  Implements an expanding
283         buffer, doubling capacity as needed to accomdate all the data.
284
285         """
286         if self._state == _BufferBlock.WRITABLE:
287             while (self.write_pointer+len(data)) > len(self.buffer_block):
288                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
289                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
290                 self.buffer_block = new_buffer_block
291                 self.buffer_view = memoryview(self.buffer_block)
292             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
293             self.write_pointer += len(data)
294             self._locator = None
295         else:
296             raise AssertionError("Buffer block is not writable")
297
298     @synchronized
299     def set_state(self, nextstate, loc=None):
300         if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
301             (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
302             self._state = nextstate
303             if self._state == _BufferBlock.COMMITTED:
304                 self._locator = loc
305                 self.buffer_view = None
306                 self.buffer_block = None
307         else:
308             raise AssertionError("Invalid state change from %s to %s" % (self.state, state))
309
310     @synchronized
311     def state(self):
312         return self._state
313
314     def size(self):
315         """The amount of data written to the buffer."""
316         return self.write_pointer
317
318     @synchronized
319     def locator(self):
320         """The Keep locator for this buffer's contents."""
321         if self._locator is None:
322             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
323         return self._locator
324
325     @synchronized
326     def clone(self, new_blockid, owner):
327         if self._state == _BufferBlock.COMMITTED:
328             raise AssertionError("Can only duplicate a writable or pending buffer block")
329         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
330         bufferblock.append(self.buffer_view[0:self.size()])
331         return bufferblock
332
333
334 class NoopLock(object):
335     def __enter__(self):
336         return self
337
338     def __exit__(self, exc_type, exc_value, traceback):
339         pass
340
341     def acquire(self, blocking=False):
342         pass
343
344     def release(self):
345         pass
346
347
348 def must_be_writable(orig_func):
349     @functools.wraps(orig_func)
350     def must_be_writable_wrapper(self, *args, **kwargs):
351         if not self.writable():
352             raise IOError((errno.EROFS, "Collection must be writable."))
353         return orig_func(self, *args, **kwargs)
354     return must_be_writable_wrapper
355
356
357 class _BlockManager(object):
358     """BlockManager handles buffer blocks.
359
360     Also handles background block uploads, and background block prefetch for a
361     Collection of ArvadosFiles.
362
363     """
364     def __init__(self, keep):
365         """keep: KeepClient object to use"""
366         self._keep = keep
367         self._bufferblocks = {}
368         self._put_queue = None
369         self._put_errors = None
370         self._put_threads = None
371         self._prefetch_queue = None
372         self._prefetch_threads = None
373         self.lock = threading.Lock()
374         self.prefetch_enabled = True
375         self.num_put_threads = 2
376         self.num_get_threads = 2
377
378     @synchronized
379     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
380         """Allocate a new, empty bufferblock in WRITABLE state and return it.
381
382         :blockid:
383           optional block identifier, otherwise one will be automatically assigned
384
385         :starting_capacity:
386           optional capacity, otherwise will use default capacity
387
388         :owner:
389           ArvadosFile that owns this block
390
391         """
392         if blockid is None:
393             blockid = "bufferblock%i" % len(self._bufferblocks)
394         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
395         self._bufferblocks[bufferblock.blockid] = bufferblock
396         return bufferblock
397
398     @synchronized
399     def dup_block(self, block, owner):
400         """Create a new bufferblock initialized with the content of an existing bufferblock.
401
402         :block:
403           the buffer block to copy.
404
405         :owner:
406           ArvadosFile that owns the new block
407
408         """
409         new_blockid = "bufferblock%i" % len(self._bufferblocks)
410         bufferblock = block.clone(new_blockid, owner)
411         self._bufferblocks[bufferblock.blockid] = bufferblock
412         return bufferblock
413
414     @synchronized
415     def is_bufferblock(self, locator):
416         return locator in self._bufferblocks
417
418     @synchronized
419     def stop_threads(self):
420         """Shut down and wait for background upload and download threads to finish."""
421
422         if self._put_threads is not None:
423             for t in self._put_threads:
424                 self._put_queue.put(None)
425             for t in self._put_threads:
426                 t.join()
427         self._put_threads = None
428         self._put_queue = None
429         self._put_errors = None
430
431         if self._prefetch_threads is not None:
432             for t in self._prefetch_threads:
433                 self._prefetch_queue.put(None)
434             for t in self._prefetch_threads:
435                 t.join()
436         self._prefetch_threads = None
437         self._prefetch_queue = None
438
439     def commit_bufferblock(self, block):
440         """Initiate a background upload of a bufferblock.
441
442         This will block if the upload queue is at capacity, otherwise it will
443         return immediately.
444
445         """
446
447         def commit_bufferblock_worker(self):
448             """Background uploader thread."""
449
450             while True:
451                 try:
452                     bufferblock = self._put_queue.get()
453                     if bufferblock is None:
454                         return
455                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
456                     bufferblock.set_state(_BufferBlock.COMMITTED, loc)
457
458                 except Exception as e:
459                     self._put_errors.put((bufferblock.locator(), e))
460                 finally:
461                     if self._put_queue is not None:
462                         self._put_queue.task_done()
463
464         with self.lock:
465             if self._put_threads is None:
466                 # Start uploader threads.
467
468                 # If we don't limit the Queue size, the upload queue can quickly
469                 # grow to take up gigabytes of RAM if the writing process is
470                 # generating data more quickly than it can be send to the Keep
471                 # servers.
472                 #
473                 # With two upload threads and a queue size of 2, this means up to 4
474                 # blocks pending.  If they are full 64 MiB blocks, that means up to
475                 # 256 MiB of internal buffering, which is the same size as the
476                 # default download block cache in KeepClient.
477                 self._put_queue = Queue.Queue(maxsize=2)
478                 self._put_errors = Queue.Queue()
479
480                 self._put_threads = []
481                 for i in xrange(0, self.num_put_threads):
482                     thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
483                     self._put_threads.append(thread)
484                     thread.daemon = True
485                     thread.start()
486
487         # Mark the block as PENDING so to disallow any more appends.
488         block.set_state(_BufferBlock.PENDING)
489         self._put_queue.put(block)
490
491     @synchronized
492     def get_bufferblock(self, locator):
493         return self._bufferblocks.get(locator)
494
495     def get_block_contents(self, locator, num_retries, cache_only=False):
496         """Fetch a block.
497
498         First checks to see if the locator is a BufferBlock and return that, if
499         not, passes the request through to KeepClient.get().
500
501         """
502         with self.lock:
503             if locator in self._bufferblocks:
504                 bufferblock = self._bufferblocks[locator]
505                 if bufferblock.state() != _BufferBlock.COMMITTED:
506                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
507                 else:
508                     locator = bufferblock._locator
509         if cache_only:
510             return self._keep.get_from_cache(locator)
511         else:
512             return self._keep.get(locator, num_retries=num_retries)
513
514     def commit_all(self):
515         """Commit all outstanding buffer blocks.
516
517         Unlike commit_bufferblock(), this is a synchronous call, and will not
518         return until all buffer blocks are uploaded.  Raises
519         KeepWriteError() if any blocks failed to upload.
520
521         """
522         with self.lock:
523             items = self._bufferblocks.items()
524
525         for k,v in items:
526             if v.state() == _BufferBlock.WRITABLE:
527                 self.commit_bufferblock(v)
528
529         with self.lock:
530             if self._put_queue is not None:
531                 self._put_queue.join()
532
533                 if not self._put_errors.empty():
534                     err = []
535                     try:
536                         while True:
537                             err.append(self._put_errors.get(False))
538                     except Queue.Empty:
539                         pass
540                     raise KeepWriteError("Error writing some blocks", err, label="block")
541
542     def block_prefetch(self, locator):
543         """Initiate a background download of a block.
544
545         This assumes that the underlying KeepClient implements a block cache,
546         so repeated requests for the same block will not result in repeated
547         downloads (unless the block is evicted from the cache.)  This method
548         does not block.
549
550         """
551
552         if not self.prefetch_enabled:
553             return
554
555         def block_prefetch_worker(self):
556             """The background downloader thread."""
557             while True:
558                 try:
559                     b = self._prefetch_queue.get()
560                     if b is None:
561                         return
562                     self._keep.get(b)
563                 except Exception:
564                     pass
565
566         with self.lock:
567             if locator in self._bufferblocks:
568                 return
569             if self._prefetch_threads is None:
570                 self._prefetch_queue = Queue.Queue()
571                 self._prefetch_threads = []
572                 for i in xrange(0, self.num_get_threads):
573                     thread = threading.Thread(target=block_prefetch_worker, args=(self,))
574                     self._prefetch_threads.append(thread)
575                     thread.daemon = True
576                     thread.start()
577         self._prefetch_queue.put(locator)
578
579
580 class ArvadosFile(object):
581     """Represent a file in a Collection.
582
583     ArvadosFile manages the underlying representation of a file in Keep as a
584     sequence of segments spanning a set of blocks, and implements random
585     read/write access.
586
587     This object may be accessed from multiple threads.
588
589     """
590
591     def __init__(self, parent, stream=[], segments=[]):
592         """
593         ArvadosFile constructor.
594
595         :stream:
596           a list of Range objects representing a block stream
597
598         :segments:
599           a list of Range objects representing segments
600         """
601         self.parent = parent
602         self._modified = True
603         self._segments = []
604         self.lock = parent.root_collection().lock
605         for s in segments:
606             self._add_segment(stream, s.locator, s.range_size)
607         self._current_bblock = None
608
609     def writable(self):
610         return self.parent.writable()
611
612     @synchronized
613     def segments(self):
614         return copy.copy(self._segments)
615
616     @synchronized
617     def clone(self, new_parent):
618         """Make a copy of this file."""
619         cp = ArvadosFile(new_parent)
620         cp.replace_contents(self)
621         return cp
622
623     @must_be_writable
624     @synchronized
625     def replace_contents(self, other):
626         """Replace segments of this file with segments from another `ArvadosFile` object."""
627
628         map_loc = {}
629         self._segments = []
630         for other_segment in other.segments():
631             new_loc = other_segment.locator
632             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
633                 if other_segment.locator not in map_loc:
634                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
635                     if bufferblock.state() != _BufferBlock.WRITABLE:
636                         map_loc[other_segment.locator] = bufferblock.locator()
637                     else:
638                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
639                 new_loc = map_loc[other_segment.locator]
640
641             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
642
643         self._modified = True
644
645     def __eq__(self, other):
646         if other is self:
647             return True
648         if not isinstance(other, ArvadosFile):
649             return False
650
651         othersegs = other.segments()
652         with self.lock:
653             if len(self._segments) != len(othersegs):
654                 return False
655             for i in xrange(0, len(othersegs)):
656                 seg1 = self._segments[i]
657                 seg2 = othersegs[i]
658                 loc1 = seg1.locator
659                 loc2 = seg2.locator
660
661                 if self.parent._my_block_manager().is_bufferblock(loc1):
662                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
663
664                 if other.parent._my_block_manager().is_bufferblock(loc2):
665                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
666
667                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
668                     seg1.range_start != seg2.range_start or
669                     seg1.range_size != seg2.range_size or
670                     seg1.segment_offset != seg2.segment_offset):
671                     return False
672
673         return True
674
675     def __ne__(self, other):
676         return not self.__eq__(other)
677
678     @synchronized
679     def set_unmodified(self):
680         """Clear the modified flag"""
681         self._modified = False
682
683     @synchronized
684     def modified(self):
685         """Test the modified flag"""
686         return self._modified
687
688     @must_be_writable
689     @synchronized
690     def truncate(self, size):
691         """Shrink the size of the file.
692
693         If `size` is less than the size of the file, the file contents after
694         `size` will be discarded.  If `size` is greater than the current size
695         of the file, an IOError will be raised.
696
697         """
698         if size < self.size():
699             new_segs = []
700             for r in self._segments:
701                 range_end = r.range_start+r.range_size
702                 if r.range_start >= size:
703                     # segment is past the trucate size, all done
704                     break
705                 elif size < range_end:
706                     nr = Range(r.locator, r.range_start, size - r.range_start)
707                     nr.segment_offset = r.segment_offset
708                     new_segs.append(nr)
709                     break
710                 else:
711                     new_segs.append(r)
712
713             self._segments = new_segs
714             self._modified = True
715         elif size > self.size():
716             raise IOError("truncate() does not support extending the file size")
717
718     def readfrom(self, offset, size, num_retries):
719         """Read upto `size` bytes from the file starting at `offset`."""
720
721         with self.lock:
722             if size == 0 or offset >= self.size():
723                 return ''
724             prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
725             readsegs = locators_and_ranges(self._segments, offset, size)
726
727         for lr in prefetch:
728             self.parent._my_block_manager().block_prefetch(lr.locator)
729
730         data = []
731         for lr in readsegs:
732             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=bool(data))
733             if block:
734                 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
735             else:
736                 break
737         return ''.join(data)
738
739     def _repack_writes(self):
740         """Test if the buffer block has more data than actual segments.
741
742         This happens when a buffered write over-writes a file range written in
743         a previous buffered write.  Re-pack the buffer block for efficiency
744         and to avoid leaking information.
745
746         """
747         segs = self._segments
748
749         # Sum up the segments to get the total bytes of the file referencing
750         # into the buffer block.
751         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
752         write_total = sum([s.range_size for s in bufferblock_segs])
753
754         if write_total < self._current_bblock.size():
755             # There is more data in the buffer block than is actually accounted for by segments, so
756             # re-pack into a new buffer by copying over to a new buffer block.
757             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
758             for t in bufferblock_segs:
759                 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
760                 t.segment_offset = new_bb.size() - t.range_size
761
762             self._current_bblock = new_bb
763
764     @must_be_writable
765     @synchronized
766     def writeto(self, offset, data, num_retries):
767         """Write `data` to the file starting at `offset`.
768
769         This will update existing bytes and/or extend the size of the file as
770         necessary.
771
772         """
773         if len(data) == 0:
774             return
775
776         if offset > self.size():
777             raise ArgumentError("Offset is past the end of the file")
778
779         if len(data) > config.KEEP_BLOCK_SIZE:
780             raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
781
782         self._modified = True
783
784         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
785             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
786
787         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
788             self._repack_writes()
789             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
790                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
791                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
792
793         self._current_bblock.append(data)
794
795         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
796
797     @synchronized
798     def flush(self):
799         if self._current_bblock:
800             self._repack_writes()
801             self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
802
803     @must_be_writable
804     @synchronized
805     def add_segment(self, blocks, pos, size):
806         """Add a segment to the end of the file.
807
808         `pos` and `offset` reference a section of the stream described by
809         `blocks` (a list of Range objects)
810
811         """
812         self._add_segment(blocks, pos, size)
813
814     def _add_segment(self, blocks, pos, size):
815         """Internal implementation of add_segment."""
816         self._modified = True
817         for lr in locators_and_ranges(blocks, pos, size):
818             last = self._segments[-1] if self._segments else Range(0, 0, 0)
819             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
820             self._segments.append(r)
821
822     @synchronized
823     def size(self):
824         """Get the file size."""
825         if self._segments:
826             n = self._segments[-1]
827             return n.range_start + n.range_size
828         else:
829             return 0
830
831     @synchronized
832     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
833         buf = ""
834         filestream = []
835         for segment in self.segments:
836             loc = segment.locator
837             if loc.startswith("bufferblock"):
838                 loc = self._bufferblocks[loc].calculate_locator()
839             if portable_locators:
840                 loc = KeepLocator(loc).stripped()
841             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
842                                  segment.segment_offset, segment.range_size))
843         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
844         buf += "\n"
845         return buf
846
847
848 class ArvadosFileReader(ArvadosFileReaderBase):
849     """Wraps ArvadosFile in a file-like object supporting reading only.
850
851     Be aware that this class is NOT thread safe as there is no locking around
852     updating file pointer.
853
854     """
855
856     def __init__(self, arvadosfile, name, mode="r", num_retries=None):
857         super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
858         self.arvadosfile = arvadosfile
859
860     def size(self):
861         return self.arvadosfile.size()
862
863     def stream_name(self):
864         return self.arvadosfile.parent.stream_name()
865
866     @_FileLikeObjectBase._before_close
867     @retry_method
868     def read(self, size, num_retries=None):
869         """Read up to `size` bytes from the stream, starting at the current file position."""
870         data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
871         self._filepos += len(data)
872         return data
873
874     @_FileLikeObjectBase._before_close
875     @retry_method
876     def readfrom(self, offset, size, num_retries=None):
877         """Read up to `size` bytes from the stream, starting at the current file position."""
878         return self.arvadosfile.readfrom(offset, size, num_retries)
879
880     def flush(self):
881         pass
882
883
884 class ArvadosFileWriter(ArvadosFileReader):
885     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
886
887     Be aware that this class is NOT thread safe as there is no locking around
888     updating file pointer.
889
890     """
891
892     def __init__(self, arvadosfile, name, mode, num_retries=None):
893         super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
894
895     @_FileLikeObjectBase._before_close
896     @retry_method
897     def write(self, data, num_retries=None):
898         if self.mode[0] == "a":
899             self.arvadosfile.writeto(self.size(), data, num_retries)
900         else:
901             self.arvadosfile.writeto(self._filepos, data, num_retries)
902             self._filepos += len(data)
903
904     @_FileLikeObjectBase._before_close
905     @retry_method
906     def writelines(self, seq, num_retries=None):
907         for s in seq:
908             self.write(s, num_retries)
909
910     @_FileLikeObjectBase._before_close
911     def truncate(self, size=None):
912         if size is None:
913             size = self._filepos
914         self.arvadosfile.truncate(size)
915         if self._filepos > self.size():
916             self._filepos = self.size()
917
918     @_FileLikeObjectBase._before_close
919     def flush(self):
920         self.arvadosfile.flush()
921
922     def close(self):
923         if not self.closed:
924             self.flush()
925             super(ArvadosFileWriter, self).close()