eb17bd4e3d70226a1be962d0c0ce2b0b3def33ee
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13
14 from .errors import KeepWriteError, AssertionError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
19
20 MOD = "mod"
21
22 _logger = logging.getLogger('arvados.arvfile')
23
24 def split(path):
25     """split(path) -> streamname, filename
26
27     Separate the stream name and file name in a /-separated stream path and
28     return a tuple (stream_name, file_name).  If no stream name is available,
29     assume '.'.
30
31     """
32     try:
33         stream_name, file_name = path.rsplit('/', 1)
34     except ValueError:  # No / in string
35         stream_name, file_name = '.', path
36     return stream_name, file_name
37
38 class _FileLikeObjectBase(object):
39     def __init__(self, name, mode):
40         self.name = name
41         self.mode = mode
42         self.closed = False
43
44     @staticmethod
45     def _before_close(orig_func):
46         @functools.wraps(orig_func)
47         def before_close_wrapper(self, *args, **kwargs):
48             if self.closed:
49                 raise ValueError("I/O operation on closed stream file")
50             return orig_func(self, *args, **kwargs)
51         return before_close_wrapper
52
53     def __enter__(self):
54         return self
55
56     def __exit__(self, exc_type, exc_value, traceback):
57         try:
58             self.close()
59         except Exception:
60             if exc_type is None:
61                 raise
62
63     def close(self):
64         self.closed = True
65
66
67 class ArvadosFileReaderBase(_FileLikeObjectBase):
68     def __init__(self, name, mode, num_retries=None):
69         super(ArvadosFileReaderBase, self).__init__(name, mode)
70         self._filepos = 0L
71         self.num_retries = num_retries
72         self._readline_cache = (None, None)
73
74     def __iter__(self):
75         while True:
76             data = self.readline()
77             if not data:
78                 break
79             yield data
80
81     def decompressed_name(self):
82         return re.sub('\.(bz2|gz)$', '', self.name)
83
84     @_FileLikeObjectBase._before_close
85     def seek(self, pos, whence=os.SEEK_SET):
86         if whence == os.SEEK_CUR:
87             pos += self._filepos
88         elif whence == os.SEEK_END:
89             pos += self.size()
90         self._filepos = min(max(pos, 0L), self.size())
91
92     def tell(self):
93         return self._filepos
94
95     @_FileLikeObjectBase._before_close
96     @retry_method
97     def readall(self, size=2**20, num_retries=None):
98         while True:
99             data = self.read(size, num_retries=num_retries)
100             if data == '':
101                 break
102             yield data
103
104     @_FileLikeObjectBase._before_close
105     @retry_method
106     def readline(self, size=float('inf'), num_retries=None):
107         cache_pos, cache_data = self._readline_cache
108         if self.tell() == cache_pos:
109             data = [cache_data]
110         else:
111             data = ['']
112         data_size = len(data[-1])
113         while (data_size < size) and ('\n' not in data[-1]):
114             next_read = self.read(2 ** 20, num_retries=num_retries)
115             if not next_read:
116                 break
117             data.append(next_read)
118             data_size += len(next_read)
119         data = ''.join(data)
120         try:
121             nextline_index = data.index('\n') + 1
122         except ValueError:
123             nextline_index = len(data)
124         nextline_index = min(nextline_index, size)
125         self._readline_cache = (self.tell(), data[nextline_index:])
126         return data[:nextline_index]
127
128     @_FileLikeObjectBase._before_close
129     @retry_method
130     def decompress(self, decompress, size, num_retries=None):
131         for segment in self.readall(size, num_retries):
132             data = decompress(segment)
133             if data:
134                 yield data
135
136     @_FileLikeObjectBase._before_close
137     @retry_method
138     def readall_decompressed(self, size=2**20, num_retries=None):
139         self.seek(0)
140         if self.name.endswith('.bz2'):
141             dc = bz2.BZ2Decompressor()
142             return self.decompress(dc.decompress, size,
143                                    num_retries=num_retries)
144         elif self.name.endswith('.gz'):
145             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
146             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
147                                    size, num_retries=num_retries)
148         else:
149             return self.readall(size, num_retries=num_retries)
150
151     @_FileLikeObjectBase._before_close
152     @retry_method
153     def readlines(self, sizehint=float('inf'), num_retries=None):
154         data = []
155         data_size = 0
156         for s in self.readall(num_retries=num_retries):
157             data.append(s)
158             data_size += len(s)
159             if data_size >= sizehint:
160                 break
161         return ''.join(data).splitlines(True)
162
163     def size(self):
164         raise NotImplementedError()
165
166     def read(self, size, num_retries=None):
167         raise NotImplementedError()
168
169     def readfrom(self, start, size, num_retries=None):
170         raise NotImplementedError()
171
172
173 class StreamFileReader(ArvadosFileReaderBase):
174     class _NameAttribute(str):
175         # The Python file API provides a plain .name attribute.
176         # Older SDK provided a name() method.
177         # This class provides both, for maximum compatibility.
178         def __call__(self):
179             return self
180
181     def __init__(self, stream, segments, name):
182         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
183         self._stream = stream
184         self.segments = segments
185
186     def stream_name(self):
187         return self._stream.name()
188
189     def size(self):
190         n = self.segments[-1]
191         return n.range_start + n.range_size
192
193     @_FileLikeObjectBase._before_close
194     @retry_method
195     def read(self, size, num_retries=None):
196         """Read up to 'size' bytes from the stream, starting at the current file position"""
197         if size == 0:
198             return ''
199
200         data = ''
201         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
202         if available_chunks:
203             lr = available_chunks[0]
204             data = self._stream.readfrom(lr.locator+lr.segment_offset,
205                                           lr.segment_size,
206                                           num_retries=num_retries)
207
208         self._filepos += len(data)
209         return data
210
211     @_FileLikeObjectBase._before_close
212     @retry_method
213     def readfrom(self, start, size, num_retries=None):
214         """Read up to 'size' bytes from the stream, starting at 'start'"""
215         if size == 0:
216             return ''
217
218         data = []
219         for lr in locators_and_ranges(self.segments, start, size):
220             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
221                                               num_retries=num_retries))
222         return ''.join(data)
223
224     def as_manifest(self):
225         segs = []
226         for r in self.segments:
227             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
228         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
229
230
231 def synchronized(orig_func):
232     @functools.wraps(orig_func)
233     def synchronized_wrapper(self, *args, **kwargs):
234         with self.lock:
235             return orig_func(self, *args, **kwargs)
236     return synchronized_wrapper
237
238 class _BufferBlock(object):
239     """A stand-in for a Keep block that is in the process of being written.
240
241     Writers can append to it, get the size, and compute the Keep locator.
242     There are three valid states:
243
244     WRITABLE
245       Can append to block.
246
247     PENDING
248       Block is in the process of being uploaded to Keep, append is an error.
249
250     COMMITTED
251       The block has been written to Keep, its internal buffer has been
252       released, fetching the block will fetch it via keep client (since we
253       discarded the internal copy), and identifiers referring to the BufferBlock
254       can be replaced with the block locator.
255
256     """
257
258     WRITABLE = 0
259     PENDING = 1
260     COMMITTED = 2
261
262     def __init__(self, blockid, starting_capacity, owner):
263         """
264         :blockid:
265           the identifier for this block
266
267         :starting_capacity:
268           the initial buffer capacity
269
270         :owner:
271           ArvadosFile that owns this block
272
273         """
274         self.blockid = blockid
275         self.buffer_block = bytearray(starting_capacity)
276         self.buffer_view = memoryview(self.buffer_block)
277         self.write_pointer = 0
278         self._state = _BufferBlock.WRITABLE
279         self._locator = None
280         self.owner = owner
281         self.lock = threading.Lock()
282
283     @synchronized
284     def append(self, data):
285         """Append some data to the buffer.
286
287         Only valid if the block is in WRITABLE state.  Implements an expanding
288         buffer, doubling capacity as needed to accomdate all the data.
289
290         """
291         if self._state == _BufferBlock.WRITABLE:
292             while (self.write_pointer+len(data)) > len(self.buffer_block):
293                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
294                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
295                 self.buffer_block = new_buffer_block
296                 self.buffer_view = memoryview(self.buffer_block)
297             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
298             self.write_pointer += len(data)
299             self._locator = None
300         else:
301             raise AssertionError("Buffer block is not writable")
302
303     @synchronized
304     def set_state(self, nextstate, loc=None):
305         if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
306             (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
307             self._state = nextstate
308             if self._state == _BufferBlock.COMMITTED:
309                 self._locator = loc
310                 self.buffer_view = None
311                 self.buffer_block = None
312         else:
313             raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate))
314
315     @synchronized
316     def state(self):
317         return self._state
318
319     def size(self):
320         """The amount of data written to the buffer."""
321         return self.write_pointer
322
323     @synchronized
324     def locator(self):
325         """The Keep locator for this buffer's contents."""
326         if self._locator is None:
327             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
328         return self._locator
329
330     @synchronized
331     def clone(self, new_blockid, owner):
332         if self._state == _BufferBlock.COMMITTED:
333             raise AssertionError("Can only duplicate a writable or pending buffer block")
334         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
335         bufferblock.append(self.buffer_view[0:self.size()])
336         return bufferblock
337
338
339 class NoopLock(object):
340     def __enter__(self):
341         return self
342
343     def __exit__(self, exc_type, exc_value, traceback):
344         pass
345
346     def acquire(self, blocking=False):
347         pass
348
349     def release(self):
350         pass
351
352
353 def must_be_writable(orig_func):
354     @functools.wraps(orig_func)
355     def must_be_writable_wrapper(self, *args, **kwargs):
356         if not self.writable():
357             raise IOError(errno.EROFS, "Collection must be writable.")
358         return orig_func(self, *args, **kwargs)
359     return must_be_writable_wrapper
360
361
362 class _BlockManager(object):
363     """BlockManager handles buffer blocks.
364
365     Also handles background block uploads, and background block prefetch for a
366     Collection of ArvadosFiles.
367
368     """
369     def __init__(self, keep):
370         """keep: KeepClient object to use"""
371         self._keep = keep
372         self._bufferblocks = {}
373         self._put_queue = None
374         self._put_errors = None
375         self._put_threads = None
376         self._prefetch_queue = None
377         self._prefetch_threads = None
378         self.lock = threading.Lock()
379         self.prefetch_enabled = True
380         self.num_put_threads = 2
381         self.num_get_threads = 2
382
383     @synchronized
384     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
385         """Allocate a new, empty bufferblock in WRITABLE state and return it.
386
387         :blockid:
388           optional block identifier, otherwise one will be automatically assigned
389
390         :starting_capacity:
391           optional capacity, otherwise will use default capacity
392
393         :owner:
394           ArvadosFile that owns this block
395
396         """
397         if blockid is None:
398             blockid = "bufferblock%i" % len(self._bufferblocks)
399         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
400         self._bufferblocks[bufferblock.blockid] = bufferblock
401         return bufferblock
402
403     @synchronized
404     def dup_block(self, block, owner):
405         """Create a new bufferblock initialized with the content of an existing bufferblock.
406
407         :block:
408           the buffer block to copy.
409
410         :owner:
411           ArvadosFile that owns the new block
412
413         """
414         new_blockid = "bufferblock%i" % len(self._bufferblocks)
415         bufferblock = block.clone(new_blockid, owner)
416         self._bufferblocks[bufferblock.blockid] = bufferblock
417         return bufferblock
418
419     @synchronized
420     def is_bufferblock(self, locator):
421         return locator in self._bufferblocks
422
423     @synchronized
424     def stop_threads(self):
425         """Shut down and wait for background upload and download threads to finish."""
426
427         if self._put_threads is not None:
428             for t in self._put_threads:
429                 self._put_queue.put(None)
430             for t in self._put_threads:
431                 t.join()
432         self._put_threads = None
433         self._put_queue = None
434         self._put_errors = None
435
436         if self._prefetch_threads is not None:
437             for t in self._prefetch_threads:
438                 self._prefetch_queue.put(None)
439             for t in self._prefetch_threads:
440                 t.join()
441         self._prefetch_threads = None
442         self._prefetch_queue = None
443
444     def commit_bufferblock(self, block):
445         """Initiate a background upload of a bufferblock.
446
447         This will block if the upload queue is at capacity, otherwise it will
448         return immediately.
449
450         """
451
452         def commit_bufferblock_worker(self):
453             """Background uploader thread."""
454
455             while True:
456                 try:
457                     bufferblock = self._put_queue.get()
458                     if bufferblock is None:
459                         return
460
461                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
462                     bufferblock.set_state(_BufferBlock.COMMITTED, loc)
463
464                 except Exception as e:
465                     self._put_errors.put((bufferblock.locator(), e))
466                 finally:
467                     if self._put_queue is not None:
468                         self._put_queue.task_done()
469
470         with self.lock:
471             if self._put_threads is None:
472                 # Start uploader threads.
473
474                 # If we don't limit the Queue size, the upload queue can quickly
475                 # grow to take up gigabytes of RAM if the writing process is
476                 # generating data more quickly than it can be send to the Keep
477                 # servers.
478                 #
479                 # With two upload threads and a queue size of 2, this means up to 4
480                 # blocks pending.  If they are full 64 MiB blocks, that means up to
481                 # 256 MiB of internal buffering, which is the same size as the
482                 # default download block cache in KeepClient.
483                 self._put_queue = Queue.Queue(maxsize=2)
484                 self._put_errors = Queue.Queue()
485
486                 self._put_threads = []
487                 for i in xrange(0, self.num_put_threads):
488                     thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
489                     self._put_threads.append(thread)
490                     thread.daemon = True
491                     thread.start()
492
493         # Mark the block as PENDING so to disallow any more appends.
494         block.set_state(_BufferBlock.PENDING)
495         self._put_queue.put(block)
496
497     @synchronized
498     def get_bufferblock(self, locator):
499         return self._bufferblocks.get(locator)
500
501     def get_block_contents(self, locator, num_retries, cache_only=False):
502         """Fetch a block.
503
504         First checks to see if the locator is a BufferBlock and return that, if
505         not, passes the request through to KeepClient.get().
506
507         """
508         with self.lock:
509             if locator in self._bufferblocks:
510                 bufferblock = self._bufferblocks[locator]
511                 if bufferblock.state() != _BufferBlock.COMMITTED:
512                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
513                 else:
514                     locator = bufferblock._locator
515         if cache_only:
516             return self._keep.get_from_cache(locator)
517         else:
518             return self._keep.get(locator, num_retries=num_retries)
519
520     def commit_all(self):
521         """Commit all outstanding buffer blocks.
522
523         Unlike commit_bufferblock(), this is a synchronous call, and will not
524         return until all buffer blocks are uploaded.  Raises
525         KeepWriteError() if any blocks failed to upload.
526
527         """
528         with self.lock:
529             items = self._bufferblocks.items()
530
531         for k,v in items:
532             if v.state() == _BufferBlock.WRITABLE:
533                 self.commit_bufferblock(v)
534
535         with self.lock:
536             if self._put_queue is not None:
537                 self._put_queue.join()
538
539                 if not self._put_errors.empty():
540                     err = []
541                     try:
542                         while True:
543                             err.append(self._put_errors.get(False))
544                     except Queue.Empty:
545                         pass
546                     raise KeepWriteError("Error writing some blocks", err, label="block")
547
548     def block_prefetch(self, locator):
549         """Initiate a background download of a block.
550
551         This assumes that the underlying KeepClient implements a block cache,
552         so repeated requests for the same block will not result in repeated
553         downloads (unless the block is evicted from the cache.)  This method
554         does not block.
555
556         """
557
558         if not self.prefetch_enabled:
559             return
560
561         def block_prefetch_worker(self):
562             """The background downloader thread."""
563             while True:
564                 try:
565                     b = self._prefetch_queue.get()
566                     if b is None:
567                         return
568                     self._keep.get(b)
569                 except Exception:
570                     pass
571
572         with self.lock:
573             if locator in self._bufferblocks:
574                 return
575             if self._prefetch_threads is None:
576                 self._prefetch_queue = Queue.Queue()
577                 self._prefetch_threads = []
578                 for i in xrange(0, self.num_get_threads):
579                     thread = threading.Thread(target=block_prefetch_worker, args=(self,))
580                     self._prefetch_threads.append(thread)
581                     thread.daemon = True
582                     thread.start()
583         self._prefetch_queue.put(locator)
584
585
586 class ArvadosFile(object):
587     """Represent a file in a Collection.
588
589     ArvadosFile manages the underlying representation of a file in Keep as a
590     sequence of segments spanning a set of blocks, and implements random
591     read/write access.
592
593     This object may be accessed from multiple threads.
594
595     """
596
597     def __init__(self, parent, name, stream=[], segments=[]):
598         """
599         ArvadosFile constructor.
600
601         :stream:
602           a list of Range objects representing a block stream
603
604         :segments:
605           a list of Range objects representing segments
606         """
607         self.parent = parent
608         self._modified = True
609         self._segments = []
610         self.lock = parent.root_collection().lock
611         for s in segments:
612             self._add_segment(stream, s.locator, s.range_size)
613         self._current_bblock = None
614         self.name = name
615
616     def writable(self):
617         return self.parent.writable()
618
619     @synchronized
620     def segments(self):
621         return copy.copy(self._segments)
622
623     @synchronized
624     def clone(self, new_parent, new_name):
625         """Make a copy of this file."""
626         cp = ArvadosFile(new_parent, new_name)
627         cp.replace_contents(self)
628         return cp
629
630     @must_be_writable
631     @synchronized
632     def replace_contents(self, other):
633         """Replace segments of this file with segments from another `ArvadosFile` object."""
634
635         map_loc = {}
636         self._segments = []
637         for other_segment in other.segments():
638             new_loc = other_segment.locator
639             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
640                 if other_segment.locator not in map_loc:
641                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
642                     if bufferblock.state() != _BufferBlock.WRITABLE:
643                         map_loc[other_segment.locator] = bufferblock.locator()
644                     else:
645                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
646                 new_loc = map_loc[other_segment.locator]
647
648             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
649
650         self._modified = True
651
652     def __eq__(self, other):
653         if other is self:
654             return True
655         if not isinstance(other, ArvadosFile):
656             return False
657
658         othersegs = other.segments()
659         with self.lock:
660             if len(self._segments) != len(othersegs):
661                 return False
662             for i in xrange(0, len(othersegs)):
663                 seg1 = self._segments[i]
664                 seg2 = othersegs[i]
665                 loc1 = seg1.locator
666                 loc2 = seg2.locator
667
668                 if self.parent._my_block_manager().is_bufferblock(loc1):
669                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
670
671                 if other.parent._my_block_manager().is_bufferblock(loc2):
672                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
673
674                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
675                     seg1.range_start != seg2.range_start or
676                     seg1.range_size != seg2.range_size or
677                     seg1.segment_offset != seg2.segment_offset):
678                     return False
679
680         return True
681
682     def __ne__(self, other):
683         return not self.__eq__(other)
684
685     @synchronized
686     def set_unmodified(self):
687         """Clear the modified flag"""
688         self._modified = False
689
690     @synchronized
691     def modified(self):
692         """Test the modified flag"""
693         return self._modified
694
695     @must_be_writable
696     @synchronized
697     def truncate(self, size):
698         """Shrink the size of the file.
699
700         If `size` is less than the size of the file, the file contents after
701         `size` will be discarded.  If `size` is greater than the current size
702         of the file, an IOError will be raised.
703
704         """
705         if size < self.size():
706             new_segs = []
707             for r in self._segments:
708                 range_end = r.range_start+r.range_size
709                 if r.range_start >= size:
710                     # segment is past the trucate size, all done
711                     break
712                 elif size < range_end:
713                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
714                     nr.segment_offset = r.segment_offset
715                     new_segs.append(nr)
716                     break
717                 else:
718                     new_segs.append(r)
719
720             self._segments = new_segs
721             self._modified = True
722         elif size > self.size():
723             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
724
725     def readfrom(self, offset, size, num_retries, exact=False):
726         """Read upto `size` bytes from the file starting at `offset`.
727
728         :exact:
729          If False (default), return less data than requested if the read
730          crosses a block boundary and the next block isn't cached.  If True,
731          only return less data than requested when hitting EOF.
732         """
733
734         with self.lock:
735             if size == 0 or offset >= self.size():
736                 return ''
737             prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
738             readsegs = locators_and_ranges(self._segments, offset, size)
739
740         for lr in prefetch:
741             self.parent._my_block_manager().block_prefetch(lr.locator)
742
743         data = []
744         for lr in readsegs:
745             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
746             if block:
747                 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
748             else:
749                 break
750         return ''.join(data)
751
752     def _repack_writes(self):
753         """Test if the buffer block has more data than actual segments.
754
755         This happens when a buffered write over-writes a file range written in
756         a previous buffered write.  Re-pack the buffer block for efficiency
757         and to avoid leaking information.
758
759         """
760         segs = self._segments
761
762         # Sum up the segments to get the total bytes of the file referencing
763         # into the buffer block.
764         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
765         write_total = sum([s.range_size for s in bufferblock_segs])
766
767         if write_total < self._current_bblock.size():
768             # There is more data in the buffer block than is actually accounted for by segments, so
769             # re-pack into a new buffer by copying over to a new buffer block.
770             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
771             for t in bufferblock_segs:
772                 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
773                 t.segment_offset = new_bb.size() - t.range_size
774
775             self._current_bblock = new_bb
776
777     @must_be_writable
778     @synchronized
779     def writeto(self, offset, data, num_retries):
780         """Write `data` to the file starting at `offset`.
781
782         This will update existing bytes and/or extend the size of the file as
783         necessary.
784
785         """
786         if len(data) == 0:
787             return
788
789         if offset > self.size():
790             raise ArgumentError("Offset is past the end of the file")
791
792         if len(data) > config.KEEP_BLOCK_SIZE:
793             raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
794
795         self._modified = True
796
797         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
798             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
799
800         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
801             self._repack_writes()
802             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
803                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
804                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
805
806         self._current_bblock.append(data)
807
808         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
809
810         self.parent.notify(MOD, self.parent, self.name, (self, self))
811
812         return len(data)
813
814     @synchronized
815     def flush(self):
816         if self.modified():
817             if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
818                 self._repack_writes()
819                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
820             self.parent.notify(MOD, self.parent, self.name, (self, self))
821
822     @must_be_writable
823     @synchronized
824     def add_segment(self, blocks, pos, size):
825         """Add a segment to the end of the file.
826
827         `pos` and `offset` reference a section of the stream described by
828         `blocks` (a list of Range objects)
829
830         """
831         self._add_segment(blocks, pos, size)
832
833     def _add_segment(self, blocks, pos, size):
834         """Internal implementation of add_segment."""
835         self._modified = True
836         for lr in locators_and_ranges(blocks, pos, size):
837             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
838             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
839             self._segments.append(r)
840
841     @synchronized
842     def size(self):
843         """Get the file size."""
844         if self._segments:
845             n = self._segments[-1]
846             return n.range_start + n.range_size
847         else:
848             return 0
849
850     @synchronized
851     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
852         buf = ""
853         filestream = []
854         for segment in self.segments:
855             loc = segment.locator
856             if loc.startswith("bufferblock"):
857                 loc = self._bufferblocks[loc].calculate_locator()
858             if portable_locators:
859                 loc = KeepLocator(loc).stripped()
860             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
861                                  segment.segment_offset, segment.range_size))
862         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
863         buf += "\n"
864         return buf
865
866
867 class ArvadosFileReader(ArvadosFileReaderBase):
868     """Wraps ArvadosFile in a file-like object supporting reading only.
869
870     Be aware that this class is NOT thread safe as there is no locking around
871     updating file pointer.
872
873     """
874
875     def __init__(self, arvadosfile,  mode="r", num_retries=None):
876         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode, num_retries=num_retries)
877         self.arvadosfile = arvadosfile
878
879     def size(self):
880         return self.arvadosfile.size()
881
882     def stream_name(self):
883         return self.arvadosfile.parent.stream_name()
884
885     @_FileLikeObjectBase._before_close
886     @retry_method
887     def read(self, size=None, num_retries=None):
888         """Read up to `size` bytes from the file and return the result.
889
890         Starts at the current file position.  If `size` is None, read the
891         entire remainder of the file.
892         """
893         if size is None:
894             data = []
895             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
896             while rd:
897                 data.append(rd)
898                 self._filepos += len(rd)
899                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
900             return ''.join(data)
901         else:
902             data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
903             self._filepos += len(data)
904             return data
905
906     @_FileLikeObjectBase._before_close
907     @retry_method
908     def readfrom(self, offset, size, num_retries=None):
909         """Read up to `size` bytes from the stream, starting at the specified file offset.
910
911         This method does not change the file position.
912         """
913         return self.arvadosfile.readfrom(offset, size, num_retries)
914
915     def flush(self):
916         pass
917
918
919 class ArvadosFileWriter(ArvadosFileReader):
920     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
921
922     Be aware that this class is NOT thread safe as there is no locking around
923     updating file pointer.
924
925     """
926
927     def __init__(self, arvadosfile, mode, num_retries=None):
928         super(ArvadosFileWriter, self).__init__(arvadosfile, mode, num_retries=num_retries)
929
930     @_FileLikeObjectBase._before_close
931     @retry_method
932     def write(self, data, num_retries=None):
933         if self.mode[0] == "a":
934             self.arvadosfile.writeto(self.size(), data, num_retries)
935         else:
936             self.arvadosfile.writeto(self._filepos, data, num_retries)
937             self._filepos += len(data)
938         return len(data)
939
940     @_FileLikeObjectBase._before_close
941     @retry_method
942     def writelines(self, seq, num_retries=None):
943         for s in seq:
944             self.write(s, num_retries)
945
946     @_FileLikeObjectBase._before_close
947     def truncate(self, size=None):
948         if size is None:
949             size = self._filepos
950         self.arvadosfile.truncate(size)
951         if self._filepos > self.size():
952             self._filepos = self.size()
953
954     @_FileLikeObjectBase._before_close
955     def flush(self):
956         self.arvadosfile.flush()
957
958     def close(self):
959         if not self.closed:
960             self.flush()
961             super(ArvadosFileWriter, self).close()