Merge branch '3198-inode-cache' into 3198-writable-fuse, fix tests.
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13
14 from .errors import KeepWriteError, AssertionError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
19
20 MOD = "mod"
21
22 _logger = logging.getLogger('arvados.arvfile')
23
24 def split(path):
25     """split(path) -> streamname, filename
26
27     Separate the stream name and file name in a /-separated stream path and
28     return a tuple (stream_name, file_name).  If no stream name is available,
29     assume '.'.
30
31     """
32     try:
33         stream_name, file_name = path.rsplit('/', 1)
34     except ValueError:  # No / in string
35         stream_name, file_name = '.', path
36     return stream_name, file_name
37
38 class _FileLikeObjectBase(object):
39     def __init__(self, name, mode):
40         self.name = name
41         self.mode = mode
42         self.closed = False
43
44     @staticmethod
45     def _before_close(orig_func):
46         @functools.wraps(orig_func)
47         def before_close_wrapper(self, *args, **kwargs):
48             if self.closed:
49                 raise ValueError("I/O operation on closed stream file")
50             return orig_func(self, *args, **kwargs)
51         return before_close_wrapper
52
53     def __enter__(self):
54         return self
55
56     def __exit__(self, exc_type, exc_value, traceback):
57         try:
58             self.close()
59         except Exception:
60             if exc_type is None:
61                 raise
62
63     def close(self):
64         self.closed = True
65
66
67 class ArvadosFileReaderBase(_FileLikeObjectBase):
68     def __init__(self, name, mode, num_retries=None):
69         super(ArvadosFileReaderBase, self).__init__(name, mode)
70         self._filepos = 0L
71         self.num_retries = num_retries
72         self._readline_cache = (None, None)
73
74     def __iter__(self):
75         while True:
76             data = self.readline()
77             if not data:
78                 break
79             yield data
80
81     def decompressed_name(self):
82         return re.sub('\.(bz2|gz)$', '', self.name)
83
84     @_FileLikeObjectBase._before_close
85     def seek(self, pos, whence=os.SEEK_SET):
86         if whence == os.SEEK_CUR:
87             pos += self._filepos
88         elif whence == os.SEEK_END:
89             pos += self.size()
90         self._filepos = min(max(pos, 0L), self.size())
91
92     def tell(self):
93         return self._filepos
94
95     @_FileLikeObjectBase._before_close
96     @retry_method
97     def readall(self, size=2**20, num_retries=None):
98         while True:
99             data = self.read(size, num_retries=num_retries)
100             if data == '':
101                 break
102             yield data
103
104     @_FileLikeObjectBase._before_close
105     @retry_method
106     def readline(self, size=float('inf'), num_retries=None):
107         cache_pos, cache_data = self._readline_cache
108         if self.tell() == cache_pos:
109             data = [cache_data]
110         else:
111             data = ['']
112         data_size = len(data[-1])
113         while (data_size < size) and ('\n' not in data[-1]):
114             next_read = self.read(2 ** 20, num_retries=num_retries)
115             if not next_read:
116                 break
117             data.append(next_read)
118             data_size += len(next_read)
119         data = ''.join(data)
120         try:
121             nextline_index = data.index('\n') + 1
122         except ValueError:
123             nextline_index = len(data)
124         nextline_index = min(nextline_index, size)
125         self._readline_cache = (self.tell(), data[nextline_index:])
126         return data[:nextline_index]
127
128     @_FileLikeObjectBase._before_close
129     @retry_method
130     def decompress(self, decompress, size, num_retries=None):
131         for segment in self.readall(size, num_retries):
132             data = decompress(segment)
133             if data:
134                 yield data
135
136     @_FileLikeObjectBase._before_close
137     @retry_method
138     def readall_decompressed(self, size=2**20, num_retries=None):
139         self.seek(0)
140         if self.name.endswith('.bz2'):
141             dc = bz2.BZ2Decompressor()
142             return self.decompress(dc.decompress, size,
143                                    num_retries=num_retries)
144         elif self.name.endswith('.gz'):
145             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
146             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
147                                    size, num_retries=num_retries)
148         else:
149             return self.readall(size, num_retries=num_retries)
150
151     @_FileLikeObjectBase._before_close
152     @retry_method
153     def readlines(self, sizehint=float('inf'), num_retries=None):
154         data = []
155         data_size = 0
156         for s in self.readall(num_retries=num_retries):
157             data.append(s)
158             data_size += len(s)
159             if data_size >= sizehint:
160                 break
161         return ''.join(data).splitlines(True)
162
163     def size(self):
164         raise NotImplementedError()
165
166     def read(self, size, num_retries=None):
167         raise NotImplementedError()
168
169     def readfrom(self, start, size, num_retries=None):
170         raise NotImplementedError()
171
172
173 class StreamFileReader(ArvadosFileReaderBase):
174     class _NameAttribute(str):
175         # The Python file API provides a plain .name attribute.
176         # Older SDK provided a name() method.
177         # This class provides both, for maximum compatibility.
178         def __call__(self):
179             return self
180
181     def __init__(self, stream, segments, name):
182         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
183         self._stream = stream
184         self.segments = segments
185
186     def stream_name(self):
187         return self._stream.name()
188
189     def size(self):
190         n = self.segments[-1]
191         return n.range_start + n.range_size
192
193     @_FileLikeObjectBase._before_close
194     @retry_method
195     def read(self, size, num_retries=None):
196         """Read up to 'size' bytes from the stream, starting at the current file position"""
197         if size == 0:
198             return ''
199
200         data = ''
201         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
202         if available_chunks:
203             lr = available_chunks[0]
204             data = self._stream.readfrom(lr.locator+lr.segment_offset,
205                                           lr.segment_size,
206                                           num_retries=num_retries)
207
208         self._filepos += len(data)
209         return data
210
211     @_FileLikeObjectBase._before_close
212     @retry_method
213     def readfrom(self, start, size, num_retries=None):
214         """Read up to 'size' bytes from the stream, starting at 'start'"""
215         if size == 0:
216             return ''
217
218         data = []
219         for lr in locators_and_ranges(self.segments, start, size):
220             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
221                                               num_retries=num_retries))
222         return ''.join(data)
223
224     def as_manifest(self):
225         segs = []
226         for r in self.segments:
227             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
228         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
229
230
231 def synchronized(orig_func):
232     @functools.wraps(orig_func)
233     def synchronized_wrapper(self, *args, **kwargs):
234         with self.lock:
235             return orig_func(self, *args, **kwargs)
236     return synchronized_wrapper
237
238 class _BufferBlock(object):
239     """A stand-in for a Keep block that is in the process of being written.
240
241     Writers can append to it, get the size, and compute the Keep locator.
242     There are three valid states:
243
244     WRITABLE
245       Can append to block.
246
247     PENDING
248       Block is in the process of being uploaded to Keep, append is an error.
249
250     COMMITTED
251       The block has been written to Keep, its internal buffer has been
252       released, fetching the block will fetch it via keep client (since we
253       discarded the internal copy), and identifiers referring to the BufferBlock
254       can be replaced with the block locator.
255
256     """
257
258     WRITABLE = 0
259     PENDING = 1
260     COMMITTED = 2
261
262     def __init__(self, blockid, starting_capacity, owner):
263         """
264         :blockid:
265           the identifier for this block
266
267         :starting_capacity:
268           the initial buffer capacity
269
270         :owner:
271           ArvadosFile that owns this block
272
273         """
274         self.blockid = blockid
275         self.buffer_block = bytearray(starting_capacity)
276         self.buffer_view = memoryview(self.buffer_block)
277         self.write_pointer = 0
278         self._state = _BufferBlock.WRITABLE
279         self._locator = None
280         self.owner = owner
281         self.lock = threading.Lock()
282
283     @synchronized
284     def append(self, data):
285         """Append some data to the buffer.
286
287         Only valid if the block is in WRITABLE state.  Implements an expanding
288         buffer, doubling capacity as needed to accomdate all the data.
289
290         """
291         if self._state == _BufferBlock.WRITABLE:
292             while (self.write_pointer+len(data)) > len(self.buffer_block):
293                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
294                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
295                 self.buffer_block = new_buffer_block
296                 self.buffer_view = memoryview(self.buffer_block)
297             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
298             self.write_pointer += len(data)
299             self._locator = None
300         else:
301             raise AssertionError("Buffer block is not writable")
302
303     @synchronized
304     def set_state(self, nextstate, loc=None):
305         if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
306             (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
307             self._state = nextstate
308             if self._state == _BufferBlock.COMMITTED:
309                 self._locator = loc
310                 self.buffer_view = None
311                 self.buffer_block = None
312         else:
313             raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate))
314
315     @synchronized
316     def state(self):
317         return self._state
318
319     def size(self):
320         """The amount of data written to the buffer."""
321         return self.write_pointer
322
323     @synchronized
324     def locator(self):
325         """The Keep locator for this buffer's contents."""
326         if self._locator is None:
327             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
328         return self._locator
329
330     @synchronized
331     def clone(self, new_blockid, owner):
332         if self._state == _BufferBlock.COMMITTED:
333             raise AssertionError("Can only duplicate a writable or pending buffer block")
334         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
335         bufferblock.append(self.buffer_view[0:self.size()])
336         return bufferblock
337
338
339 class NoopLock(object):
340     def __enter__(self):
341         return self
342
343     def __exit__(self, exc_type, exc_value, traceback):
344         pass
345
346     def acquire(self, blocking=False):
347         pass
348
349     def release(self):
350         pass
351
352
353 def must_be_writable(orig_func):
354     @functools.wraps(orig_func)
355     def must_be_writable_wrapper(self, *args, **kwargs):
356         if not self.writable():
357             raise IOError(errno.EROFS, "Collection must be writable.")
358         return orig_func(self, *args, **kwargs)
359     return must_be_writable_wrapper
360
361
362 class _BlockManager(object):
363     """BlockManager handles buffer blocks.
364
365     Also handles background block uploads, and background block prefetch for a
366     Collection of ArvadosFiles.
367
368     """
369     def __init__(self, keep):
370         """keep: KeepClient object to use"""
371         self._keep = keep
372         self._bufferblocks = {}
373         self._put_queue = None
374         self._put_errors = None
375         self._put_threads = None
376         self._prefetch_queue = None
377         self._prefetch_threads = None
378         self.lock = threading.Lock()
379         self.prefetch_enabled = True
380         self.num_put_threads = 2
381         self.num_get_threads = 2
382
383     @synchronized
384     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
385         """Allocate a new, empty bufferblock in WRITABLE state and return it.
386
387         :blockid:
388           optional block identifier, otherwise one will be automatically assigned
389
390         :starting_capacity:
391           optional capacity, otherwise will use default capacity
392
393         :owner:
394           ArvadosFile that owns this block
395
396         """
397         if blockid is None:
398             blockid = "bufferblock%i" % len(self._bufferblocks)
399         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
400         self._bufferblocks[bufferblock.blockid] = bufferblock
401         return bufferblock
402
403     @synchronized
404     def dup_block(self, block, owner):
405         """Create a new bufferblock initialized with the content of an existing bufferblock.
406
407         :block:
408           the buffer block to copy.
409
410         :owner:
411           ArvadosFile that owns the new block
412
413         """
414         new_blockid = "bufferblock%i" % len(self._bufferblocks)
415         bufferblock = block.clone(new_blockid, owner)
416         self._bufferblocks[bufferblock.blockid] = bufferblock
417         return bufferblock
418
419     @synchronized
420     def is_bufferblock(self, locator):
421         return locator in self._bufferblocks
422
423     @synchronized
424     def stop_threads(self):
425         """Shut down and wait for background upload and download threads to finish."""
426
427         if self._put_threads is not None:
428             for t in self._put_threads:
429                 self._put_queue.put(None)
430             for t in self._put_threads:
431                 t.join()
432         self._put_threads = None
433         self._put_queue = None
434         self._put_errors = None
435
436         if self._prefetch_threads is not None:
437             for t in self._prefetch_threads:
438                 self._prefetch_queue.put(None)
439             for t in self._prefetch_threads:
440                 t.join()
441         self._prefetch_threads = None
442         self._prefetch_queue = None
443
444     def commit_bufferblock(self, block, wait):
445         """Initiate a background upload of a bufferblock.
446
447         :block:
448           The block object to upload
449
450         :wait:
451           If `wait` is True, upload the block synchronously.
452           If `wait` is False, upload the block asynchronously.  This will
453           return immediately unless if the upload queue is at capacity, in
454           which case it will wait on an upload queue slot.
455
456         """
457
458         def commit_bufferblock_worker(self):
459             """Background uploader thread."""
460
461             while True:
462                 try:
463                     bufferblock = self._put_queue.get()
464                     if bufferblock is None:
465                         return
466
467                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
468                     bufferblock.set_state(_BufferBlock.COMMITTED, loc)
469
470                 except Exception as e:
471                     self._put_errors.put((bufferblock.locator(), e))
472                 finally:
473                     if self._put_queue is not None:
474                         self._put_queue.task_done()
475
476         if block.state() != _BufferBlock.WRITABLE:
477             return
478
479         if wait:
480             block.set_state(_BufferBlock.PENDING)
481             loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
482             block.set_state(_BufferBlock.COMMITTED, loc)
483         else:
484             with self.lock:
485                 if self._put_threads is None:
486                     # Start uploader threads.
487
488                     # If we don't limit the Queue size, the upload queue can quickly
489                     # grow to take up gigabytes of RAM if the writing process is
490                     # generating data more quickly than it can be send to the Keep
491                     # servers.
492                     #
493                     # With two upload threads and a queue size of 2, this means up to 4
494                     # blocks pending.  If they are full 64 MiB blocks, that means up to
495                     # 256 MiB of internal buffering, which is the same size as the
496                     # default download block cache in KeepClient.
497                     self._put_queue = Queue.Queue(maxsize=2)
498                     self._put_errors = Queue.Queue()
499
500                     self._put_threads = []
501                     for i in xrange(0, self.num_put_threads):
502                         thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
503                         self._put_threads.append(thread)
504                         thread.daemon = True
505                         thread.start()
506
507             # Mark the block as PENDING so to disallow any more appends.
508             block.set_state(_BufferBlock.PENDING)
509             self._put_queue.put(block)
510
511     @synchronized
512     def get_bufferblock(self, locator):
513         return self._bufferblocks.get(locator)
514
515     def get_block_contents(self, locator, num_retries, cache_only=False):
516         """Fetch a block.
517
518         First checks to see if the locator is a BufferBlock and return that, if
519         not, passes the request through to KeepClient.get().
520
521         """
522         with self.lock:
523             if locator in self._bufferblocks:
524                 bufferblock = self._bufferblocks[locator]
525                 if bufferblock.state() != _BufferBlock.COMMITTED:
526                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
527                 else:
528                     locator = bufferblock._locator
529         if cache_only:
530             return self._keep.get_from_cache(locator)
531         else:
532             return self._keep.get(locator, num_retries=num_retries)
533
534     def commit_all(self):
535         """Commit all outstanding buffer blocks.
536
537         Unlike commit_bufferblock(), this is a synchronous call, and will not
538         return until all buffer blocks are uploaded.  Raises
539         KeepWriteError() if any blocks failed to upload.
540
541         """
542         with self.lock:
543             items = self._bufferblocks.items()
544
545         for k,v in items:
546             if v.state() == _BufferBlock.WRITABLE:
547                 v.owner.flush(False)
548
549         with self.lock:
550             if self._put_queue is not None:
551                 self._put_queue.join()
552
553                 if not self._put_errors.empty():
554                     err = []
555                     try:
556                         while True:
557                             err.append(self._put_errors.get(False))
558                     except Queue.Empty:
559                         pass
560                     raise KeepWriteError("Error writing some blocks", err, label="block")
561
562     def block_prefetch(self, locator):
563         """Initiate a background download of a block.
564
565         This assumes that the underlying KeepClient implements a block cache,
566         so repeated requests for the same block will not result in repeated
567         downloads (unless the block is evicted from the cache.)  This method
568         does not block.
569
570         """
571
572         if not self.prefetch_enabled:
573             return
574
575         def block_prefetch_worker(self):
576             """The background downloader thread."""
577             while True:
578                 try:
579                     b = self._prefetch_queue.get()
580                     if b is None:
581                         return
582                     self._keep.get(b)
583                 except Exception:
584                     pass
585
586         with self.lock:
587             if locator in self._bufferblocks:
588                 return
589             if self._prefetch_threads is None:
590                 self._prefetch_queue = Queue.Queue()
591                 self._prefetch_threads = []
592                 for i in xrange(0, self.num_get_threads):
593                     thread = threading.Thread(target=block_prefetch_worker, args=(self,))
594                     self._prefetch_threads.append(thread)
595                     thread.daemon = True
596                     thread.start()
597         self._prefetch_queue.put(locator)
598
599
600 class ArvadosFile(object):
601     """Represent a file in a Collection.
602
603     ArvadosFile manages the underlying representation of a file in Keep as a
604     sequence of segments spanning a set of blocks, and implements random
605     read/write access.
606
607     This object may be accessed from multiple threads.
608
609     """
610
611     def __init__(self, parent, name, stream=[], segments=[]):
612         """
613         ArvadosFile constructor.
614
615         :stream:
616           a list of Range objects representing a block stream
617
618         :segments:
619           a list of Range objects representing segments
620         """
621         self.parent = parent
622         self.name = name
623         self._modified = True
624         self._segments = []
625         self.lock = parent.root_collection().lock
626         for s in segments:
627             self._add_segment(stream, s.locator, s.range_size)
628         self._current_bblock = None
629
630     def writable(self):
631         return self.parent.writable()
632
633     @synchronized
634     def segments(self):
635         return copy.copy(self._segments)
636
637     @synchronized
638     def clone(self, new_parent, new_name):
639         """Make a copy of this file."""
640         cp = ArvadosFile(new_parent, new_name)
641         cp.replace_contents(self)
642         return cp
643
644     @must_be_writable
645     @synchronized
646     def replace_contents(self, other):
647         """Replace segments of this file with segments from another `ArvadosFile` object."""
648
649         map_loc = {}
650         self._segments = []
651         for other_segment in other.segments():
652             new_loc = other_segment.locator
653             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
654                 if other_segment.locator not in map_loc:
655                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
656                     if bufferblock.state() != _BufferBlock.WRITABLE:
657                         map_loc[other_segment.locator] = bufferblock.locator()
658                     else:
659                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
660                 new_loc = map_loc[other_segment.locator]
661
662             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
663
664         self._modified = True
665
666     def __eq__(self, other):
667         if other is self:
668             return True
669         if not isinstance(other, ArvadosFile):
670             return False
671
672         othersegs = other.segments()
673         with self.lock:
674             if len(self._segments) != len(othersegs):
675                 return False
676             for i in xrange(0, len(othersegs)):
677                 seg1 = self._segments[i]
678                 seg2 = othersegs[i]
679                 loc1 = seg1.locator
680                 loc2 = seg2.locator
681
682                 if self.parent._my_block_manager().is_bufferblock(loc1):
683                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
684
685                 if other.parent._my_block_manager().is_bufferblock(loc2):
686                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
687
688                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
689                     seg1.range_start != seg2.range_start or
690                     seg1.range_size != seg2.range_size or
691                     seg1.segment_offset != seg2.segment_offset):
692                     return False
693
694         return True
695
696     def __ne__(self, other):
697         return not self.__eq__(other)
698
699     @synchronized
700     def set_unmodified(self):
701         """Clear the modified flag"""
702         self._modified = False
703
704     @synchronized
705     def modified(self):
706         """Test the modified flag"""
707         return self._modified
708
709     @must_be_writable
710     @synchronized
711     def truncate(self, size):
712         """Shrink the size of the file.
713
714         If `size` is less than the size of the file, the file contents after
715         `size` will be discarded.  If `size` is greater than the current size
716         of the file, an IOError will be raised.
717
718         """
719         if size < self.size():
720             new_segs = []
721             for r in self._segments:
722                 range_end = r.range_start+r.range_size
723                 if r.range_start >= size:
724                     # segment is past the trucate size, all done
725                     break
726                 elif size < range_end:
727                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
728                     nr.segment_offset = r.segment_offset
729                     new_segs.append(nr)
730                     break
731                 else:
732                     new_segs.append(r)
733
734             self._segments = new_segs
735             self._modified = True
736         elif size > self.size():
737             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
738
739     def readfrom(self, offset, size, num_retries, exact=False):
740         """Read upto `size` bytes from the file starting at `offset`.
741
742         :exact:
743          If False (default), return less data than requested if the read
744          crosses a block boundary and the next block isn't cached.  If True,
745          only return less data than requested when hitting EOF.
746         """
747
748         with self.lock:
749             if size == 0 or offset >= self.size():
750                 return ''
751             prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
752             readsegs = locators_and_ranges(self._segments, offset, size)
753
754         for lr in prefetch:
755             self.parent._my_block_manager().block_prefetch(lr.locator)
756
757         data = []
758         for lr in readsegs:
759             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
760             if block:
761                 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
762             else:
763                 break
764         return ''.join(data)
765
766     def _repack_writes(self, num_retries):
767         """Test if the buffer block has more data than actual segments.
768
769         This happens when a buffered write over-writes a file range written in
770         a previous buffered write.  Re-pack the buffer block for efficiency
771         and to avoid leaking information.
772
773         """
774         segs = self._segments
775
776         # Sum up the segments to get the total bytes of the file referencing
777         # into the buffer block.
778         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
779         write_total = sum([s.range_size for s in bufferblock_segs])
780
781         if write_total < self._current_bblock.size():
782             # There is more data in the buffer block than is actually accounted for by segments, so
783             # re-pack into a new buffer by copying over to a new buffer block.
784             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
785             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
786             for t in bufferblock_segs:
787                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
788                 t.segment_offset = new_bb.size() - t.range_size
789
790             self._current_bblock = new_bb
791
792     @must_be_writable
793     @synchronized
794     def writeto(self, offset, data, num_retries):
795         """Write `data` to the file starting at `offset`.
796
797         This will update existing bytes and/or extend the size of the file as
798         necessary.
799
800         """
801         if len(data) == 0:
802             return
803
804         if offset > self.size():
805             raise ArgumentError("Offset is past the end of the file")
806
807         if len(data) > config.KEEP_BLOCK_SIZE:
808             raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
809
810         self._modified = True
811
812         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
813             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
814
815         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
816             self._repack_writes(num_retries)
817             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
818                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, False)
819                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
820
821         self._current_bblock.append(data)
822
823         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
824
825         self.parent.notify(MOD, self.parent, self.name, (self, self))
826
827         return len(data)
828
829     @synchronized
830     def flush(self, wait=True, num_retries=0):
831         if self.modified():
832             if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
833                 self._repack_writes(num_retries)
834                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, wait)
835             self.parent.notify(MOD, self.parent, self.name, (self, self))
836
837     @must_be_writable
838     @synchronized
839     def add_segment(self, blocks, pos, size):
840         """Add a segment to the end of the file.
841
842         `pos` and `offset` reference a section of the stream described by
843         `blocks` (a list of Range objects)
844
845         """
846         self._add_segment(blocks, pos, size)
847
848     def _add_segment(self, blocks, pos, size):
849         """Internal implementation of add_segment."""
850         self._modified = True
851         for lr in locators_and_ranges(blocks, pos, size):
852             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
853             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
854             self._segments.append(r)
855
856     @synchronized
857     def size(self):
858         """Get the file size."""
859         if self._segments:
860             n = self._segments[-1]
861             return n.range_start + n.range_size
862         else:
863             return 0
864
865     @synchronized
866     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
867         buf = ""
868         filestream = []
869         for segment in self.segments:
870             loc = segment.locator
871             if loc.startswith("bufferblock"):
872                 loc = self._bufferblocks[loc].calculate_locator()
873             if portable_locators:
874                 loc = KeepLocator(loc).stripped()
875             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
876                                  segment.segment_offset, segment.range_size))
877         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
878         buf += "\n"
879         return buf
880
881     @must_be_writable
882     @synchronized
883     def reparent(self, newparent, newname):
884         self.flush()
885         self.parent.remove(self.name)
886
887         self.parent = newparent
888         self.name = newname
889         self.lock = self.parent.root_collection().lock
890         self._modified = True
891
892 class ArvadosFileReader(ArvadosFileReaderBase):
893     """Wraps ArvadosFile in a file-like object supporting reading only.
894
895     Be aware that this class is NOT thread safe as there is no locking around
896     updating file pointer.
897
898     """
899
900     def __init__(self, arvadosfile,  mode="r", num_retries=None):
901         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode, num_retries=num_retries)
902         self.arvadosfile = arvadosfile
903
904     def size(self):
905         return self.arvadosfile.size()
906
907     def stream_name(self):
908         return self.arvadosfile.parent.stream_name()
909
910     @_FileLikeObjectBase._before_close
911     @retry_method
912     def read(self, size=None, num_retries=None):
913         """Read up to `size` bytes from the file and return the result.
914
915         Starts at the current file position.  If `size` is None, read the
916         entire remainder of the file.
917         """
918         if size is None:
919             data = []
920             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
921             while rd:
922                 data.append(rd)
923                 self._filepos += len(rd)
924                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
925             return ''.join(data)
926         else:
927             data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
928             self._filepos += len(data)
929             return data
930
931     @_FileLikeObjectBase._before_close
932     @retry_method
933     def readfrom(self, offset, size, num_retries=None):
934         """Read up to `size` bytes from the stream, starting at the specified file offset.
935
936         This method does not change the file position.
937         """
938         return self.arvadosfile.readfrom(offset, size, num_retries)
939
940     def flush(self):
941         pass
942
943
944 class ArvadosFileWriter(ArvadosFileReader):
945     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
946
947     Be aware that this class is NOT thread safe as there is no locking around
948     updating file pointer.
949
950     """
951
952     def __init__(self, arvadosfile, mode, num_retries=None):
953         super(ArvadosFileWriter, self).__init__(arvadosfile, mode, num_retries=num_retries)
954
955     @_FileLikeObjectBase._before_close
956     @retry_method
957     def write(self, data, num_retries=None):
958         if self.mode[0] == "a":
959             self.arvadosfile.writeto(self.size(), data, num_retries)
960         else:
961             self.arvadosfile.writeto(self._filepos, data, num_retries)
962             self._filepos += len(data)
963         return len(data)
964
965     @_FileLikeObjectBase._before_close
966     @retry_method
967     def writelines(self, seq, num_retries=None):
968         for s in seq:
969             self.write(s, num_retries)
970
971     @_FileLikeObjectBase._before_close
972     def truncate(self, size=None):
973         if size is None:
974             size = self._filepos
975         self.arvadosfile.truncate(size)
976         if self._filepos > self.size():
977             self._filepos = self.size()
978
979     @_FileLikeObjectBase._before_close
980     def flush(self):
981         self.arvadosfile.flush()
982
983     def close(self):
984         if not self.closed:
985             self.flush()
986             super(ArvadosFileWriter, self).close()