Merge branch 'master' into 3198-writable-fuse
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13
14 from .errors import KeepWriteError, AssertionError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
19
20 MOD = "mod"
21
22 _logger = logging.getLogger('arvados.arvfile')
23
24 def split(path):
25     """split(path) -> streamname, filename
26
27     Separate the stream name and file name in a /-separated stream path and
28     return a tuple (stream_name, file_name).  If no stream name is available,
29     assume '.'.
30
31     """
32     try:
33         stream_name, file_name = path.rsplit('/', 1)
34     except ValueError:  # No / in string
35         stream_name, file_name = '.', path
36     return stream_name, file_name
37
38 class _FileLikeObjectBase(object):
39     def __init__(self, name, mode):
40         self.name = name
41         self.mode = mode
42         self.closed = False
43
44     @staticmethod
45     def _before_close(orig_func):
46         @functools.wraps(orig_func)
47         def before_close_wrapper(self, *args, **kwargs):
48             if self.closed:
49                 raise ValueError("I/O operation on closed stream file")
50             return orig_func(self, *args, **kwargs)
51         return before_close_wrapper
52
53     def __enter__(self):
54         return self
55
56     def __exit__(self, exc_type, exc_value, traceback):
57         try:
58             self.close()
59         except Exception:
60             if exc_type is None:
61                 raise
62
63     def close(self):
64         self.closed = True
65
66
67 class ArvadosFileReaderBase(_FileLikeObjectBase):
68     def __init__(self, name, mode, num_retries=None):
69         super(ArvadosFileReaderBase, self).__init__(name, mode)
70         self._filepos = 0L
71         self.num_retries = num_retries
72         self._readline_cache = (None, None)
73
74     def __iter__(self):
75         while True:
76             data = self.readline()
77             if not data:
78                 break
79             yield data
80
81     def decompressed_name(self):
82         return re.sub('\.(bz2|gz)$', '', self.name)
83
84     @_FileLikeObjectBase._before_close
85     def seek(self, pos, whence=os.SEEK_SET):
86         if whence == os.SEEK_CUR:
87             pos += self._filepos
88         elif whence == os.SEEK_END:
89             pos += self.size()
90         self._filepos = min(max(pos, 0L), self.size())
91
92     def tell(self):
93         return self._filepos
94
95     @_FileLikeObjectBase._before_close
96     @retry_method
97     def readall(self, size=2**20, num_retries=None):
98         while True:
99             data = self.read(size, num_retries=num_retries)
100             if data == '':
101                 break
102             yield data
103
104     @_FileLikeObjectBase._before_close
105     @retry_method
106     def readline(self, size=float('inf'), num_retries=None):
107         cache_pos, cache_data = self._readline_cache
108         if self.tell() == cache_pos:
109             data = [cache_data]
110         else:
111             data = ['']
112         data_size = len(data[-1])
113         while (data_size < size) and ('\n' not in data[-1]):
114             next_read = self.read(2 ** 20, num_retries=num_retries)
115             if not next_read:
116                 break
117             data.append(next_read)
118             data_size += len(next_read)
119         data = ''.join(data)
120         try:
121             nextline_index = data.index('\n') + 1
122         except ValueError:
123             nextline_index = len(data)
124         nextline_index = min(nextline_index, size)
125         self._readline_cache = (self.tell(), data[nextline_index:])
126         return data[:nextline_index]
127
128     @_FileLikeObjectBase._before_close
129     @retry_method
130     def decompress(self, decompress, size, num_retries=None):
131         for segment in self.readall(size, num_retries):
132             data = decompress(segment)
133             if data:
134                 yield data
135
136     @_FileLikeObjectBase._before_close
137     @retry_method
138     def readall_decompressed(self, size=2**20, num_retries=None):
139         self.seek(0)
140         if self.name.endswith('.bz2'):
141             dc = bz2.BZ2Decompressor()
142             return self.decompress(dc.decompress, size,
143                                    num_retries=num_retries)
144         elif self.name.endswith('.gz'):
145             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
146             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
147                                    size, num_retries=num_retries)
148         else:
149             return self.readall(size, num_retries=num_retries)
150
151     @_FileLikeObjectBase._before_close
152     @retry_method
153     def readlines(self, sizehint=float('inf'), num_retries=None):
154         data = []
155         data_size = 0
156         for s in self.readall(num_retries=num_retries):
157             data.append(s)
158             data_size += len(s)
159             if data_size >= sizehint:
160                 break
161         return ''.join(data).splitlines(True)
162
163     def size(self):
164         raise NotImplementedError()
165
166     def read(self, size, num_retries=None):
167         raise NotImplementedError()
168
169     def readfrom(self, start, size, num_retries=None):
170         raise NotImplementedError()
171
172
173 class StreamFileReader(ArvadosFileReaderBase):
174     class _NameAttribute(str):
175         # The Python file API provides a plain .name attribute.
176         # Older SDK provided a name() method.
177         # This class provides both, for maximum compatibility.
178         def __call__(self):
179             return self
180
181     def __init__(self, stream, segments, name):
182         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
183         self._stream = stream
184         self.segments = segments
185
186     def stream_name(self):
187         return self._stream.name()
188
189     def size(self):
190         n = self.segments[-1]
191         return n.range_start + n.range_size
192
193     @_FileLikeObjectBase._before_close
194     @retry_method
195     def read(self, size, num_retries=None):
196         """Read up to 'size' bytes from the stream, starting at the current file position"""
197         if size == 0:
198             return ''
199
200         data = ''
201         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
202         if available_chunks:
203             lr = available_chunks[0]
204             data = self._stream.readfrom(lr.locator+lr.segment_offset,
205                                           lr.segment_size,
206                                           num_retries=num_retries)
207
208         self._filepos += len(data)
209         return data
210
211     @_FileLikeObjectBase._before_close
212     @retry_method
213     def readfrom(self, start, size, num_retries=None):
214         """Read up to 'size' bytes from the stream, starting at 'start'"""
215         if size == 0:
216             return ''
217
218         data = []
219         for lr in locators_and_ranges(self.segments, start, size):
220             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
221                                               num_retries=num_retries))
222         return ''.join(data)
223
224     def as_manifest(self):
225         segs = []
226         for r in self.segments:
227             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
228         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
229
230
231 def synchronized(orig_func):
232     @functools.wraps(orig_func)
233     def synchronized_wrapper(self, *args, **kwargs):
234         with self.lock:
235             return orig_func(self, *args, **kwargs)
236     return synchronized_wrapper
237
238 class _BufferBlock(object):
239     """A stand-in for a Keep block that is in the process of being written.
240
241     Writers can append to it, get the size, and compute the Keep locator.
242     There are three valid states:
243
244     WRITABLE
245       Can append to block.
246
247     PENDING
248       Block is in the process of being uploaded to Keep, append is an error.
249
250     COMMITTED
251       The block has been written to Keep, its internal buffer has been
252       released, fetching the block will fetch it via keep client (since we
253       discarded the internal copy), and identifiers referring to the BufferBlock
254       can be replaced with the block locator.
255
256     """
257
258     WRITABLE = 0
259     PENDING = 1
260     COMMITTED = 2
261
262     def __init__(self, blockid, starting_capacity, owner):
263         """
264         :blockid:
265           the identifier for this block
266
267         :starting_capacity:
268           the initial buffer capacity
269
270         :owner:
271           ArvadosFile that owns this block
272
273         """
274         self.blockid = blockid
275         self.buffer_block = bytearray(starting_capacity)
276         self.buffer_view = memoryview(self.buffer_block)
277         self.write_pointer = 0
278         self._state = _BufferBlock.WRITABLE
279         self._locator = None
280         self.owner = owner
281         self.lock = threading.Lock()
282
283     @synchronized
284     def append(self, data):
285         """Append some data to the buffer.
286
287         Only valid if the block is in WRITABLE state.  Implements an expanding
288         buffer, doubling capacity as needed to accomdate all the data.
289
290         """
291         if self._state == _BufferBlock.WRITABLE:
292             while (self.write_pointer+len(data)) > len(self.buffer_block):
293                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
294                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
295                 self.buffer_block = new_buffer_block
296                 self.buffer_view = memoryview(self.buffer_block)
297             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
298             self.write_pointer += len(data)
299             self._locator = None
300         else:
301             raise AssertionError("Buffer block is not writable")
302
303     @synchronized
304     def set_state(self, nextstate, loc=None):
305         if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
306             (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
307             self._state = nextstate
308             if self._state == _BufferBlock.COMMITTED:
309                 self._locator = loc
310                 self.buffer_view = None
311                 self.buffer_block = None
312         else:
313             raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate))
314
315     @synchronized
316     def state(self):
317         return self._state
318
319     def size(self):
320         """The amount of data written to the buffer."""
321         return self.write_pointer
322
323     @synchronized
324     def locator(self):
325         """The Keep locator for this buffer's contents."""
326         if self._locator is None:
327             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
328         return self._locator
329
330     @synchronized
331     def clone(self, new_blockid, owner):
332         if self._state == _BufferBlock.COMMITTED:
333             raise AssertionError("Can only duplicate a writable or pending buffer block")
334         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
335         bufferblock.append(self.buffer_view[0:self.size()])
336         return bufferblock
337
338     @synchronized
339     def clear(self):
340         self.owner = None
341         self.buffer_block = None
342         self.buffer_view = None
343
344
345 class NoopLock(object):
346     def __enter__(self):
347         return self
348
349     def __exit__(self, exc_type, exc_value, traceback):
350         pass
351
352     def acquire(self, blocking=False):
353         pass
354
355     def release(self):
356         pass
357
358
359 def must_be_writable(orig_func):
360     @functools.wraps(orig_func)
361     def must_be_writable_wrapper(self, *args, **kwargs):
362         if not self.writable():
363             raise IOError(errno.EROFS, "Collection must be writable.")
364         return orig_func(self, *args, **kwargs)
365     return must_be_writable_wrapper
366
367
368 class _BlockManager(object):
369     """BlockManager handles buffer blocks.
370
371     Also handles background block uploads, and background block prefetch for a
372     Collection of ArvadosFiles.
373
374     """
375     def __init__(self, keep):
376         """keep: KeepClient object to use"""
377         self._keep = keep
378         self._bufferblocks = {}
379         self._put_queue = None
380         self._put_errors = None
381         self._put_threads = None
382         self._prefetch_queue = None
383         self._prefetch_threads = None
384         self.lock = threading.Lock()
385         self.prefetch_enabled = True
386         self.num_put_threads = 2
387         self.num_get_threads = 2
388
389     @synchronized
390     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
391         """Allocate a new, empty bufferblock in WRITABLE state and return it.
392
393         :blockid:
394           optional block identifier, otherwise one will be automatically assigned
395
396         :starting_capacity:
397           optional capacity, otherwise will use default capacity
398
399         :owner:
400           ArvadosFile that owns this block
401
402         """
403         if blockid is None:
404             blockid = "bufferblock%i" % len(self._bufferblocks)
405         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
406         self._bufferblocks[bufferblock.blockid] = bufferblock
407         return bufferblock
408
409     @synchronized
410     def dup_block(self, block, owner):
411         """Create a new bufferblock initialized with the content of an existing bufferblock.
412
413         :block:
414           the buffer block to copy.
415
416         :owner:
417           ArvadosFile that owns the new block
418
419         """
420         new_blockid = "bufferblock%i" % len(self._bufferblocks)
421         bufferblock = block.clone(new_blockid, owner)
422         self._bufferblocks[bufferblock.blockid] = bufferblock
423         return bufferblock
424
425     @synchronized
426     def is_bufferblock(self, locator):
427         return locator in self._bufferblocks
428
429     @synchronized
430     def stop_threads(self):
431         """Shut down and wait for background upload and download threads to finish."""
432
433         if self._put_threads is not None:
434             for t in self._put_threads:
435                 self._put_queue.put(None)
436             for t in self._put_threads:
437                 t.join()
438         self._put_threads = None
439         self._put_queue = None
440         self._put_errors = None
441
442         if self._prefetch_threads is not None:
443             for t in self._prefetch_threads:
444                 self._prefetch_queue.put(None)
445             for t in self._prefetch_threads:
446                 t.join()
447         self._prefetch_threads = None
448         self._prefetch_queue = None
449
450     def commit_bufferblock(self, block, wait):
451         """Initiate a background upload of a bufferblock.
452
453         :block:
454           The block object to upload
455
456         :wait:
457           If `wait` is True, upload the block synchronously.
458           If `wait` is False, upload the block asynchronously.  This will
459           return immediately unless if the upload queue is at capacity, in
460           which case it will wait on an upload queue slot.
461
462         """
463
464         def commit_bufferblock_worker(self):
465             """Background uploader thread."""
466
467             while True:
468                 try:
469                     bufferblock = self._put_queue.get()
470                     if bufferblock is None:
471                         return
472
473                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
474                     bufferblock.set_state(_BufferBlock.COMMITTED, loc)
475
476                 except Exception as e:
477                     self._put_errors.put((bufferblock.locator(), e))
478                 finally:
479                     if self._put_queue is not None:
480                         self._put_queue.task_done()
481
482         if block.state() != _BufferBlock.WRITABLE:
483             return
484
485         if wait:
486             block.set_state(_BufferBlock.PENDING)
487             loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
488             block.set_state(_BufferBlock.COMMITTED, loc)
489         else:
490             with self.lock:
491                 if self._put_threads is None:
492                     # Start uploader threads.
493
494                     # If we don't limit the Queue size, the upload queue can quickly
495                     # grow to take up gigabytes of RAM if the writing process is
496                     # generating data more quickly than it can be send to the Keep
497                     # servers.
498                     #
499                     # With two upload threads and a queue size of 2, this means up to 4
500                     # blocks pending.  If they are full 64 MiB blocks, that means up to
501                     # 256 MiB of internal buffering, which is the same size as the
502                     # default download block cache in KeepClient.
503                     self._put_queue = Queue.Queue(maxsize=2)
504                     self._put_errors = Queue.Queue()
505
506                     self._put_threads = []
507                     for i in xrange(0, self.num_put_threads):
508                         thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
509                         self._put_threads.append(thread)
510                         thread.daemon = True
511                         thread.start()
512
513             # Mark the block as PENDING so to disallow any more appends.
514             block.set_state(_BufferBlock.PENDING)
515             self._put_queue.put(block)
516
517     @synchronized
518     def get_bufferblock(self, locator):
519         return self._bufferblocks.get(locator)
520
521     @synchronized
522     def delete_bufferblock(self, locator):
523         bb = self._bufferblocks[locator]
524         bb.clear()
525         del self._bufferblocks[locator]
526
527     def get_block_contents(self, locator, num_retries, cache_only=False):
528         """Fetch a block.
529
530         First checks to see if the locator is a BufferBlock and return that, if
531         not, passes the request through to KeepClient.get().
532
533         """
534         with self.lock:
535             if locator in self._bufferblocks:
536                 bufferblock = self._bufferblocks[locator]
537                 if bufferblock.state() != _BufferBlock.COMMITTED:
538                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
539                 else:
540                     locator = bufferblock._locator
541         if cache_only:
542             return self._keep.get_from_cache(locator)
543         else:
544             return self._keep.get(locator, num_retries=num_retries)
545
546     def commit_all(self):
547         """Commit all outstanding buffer blocks.
548
549         Unlike commit_bufferblock(), this is a synchronous call, and will not
550         return until all buffer blocks are uploaded.  Raises
551         KeepWriteError() if any blocks failed to upload.
552
553         """
554         with self.lock:
555             items = self._bufferblocks.items()
556
557         for k,v in items:
558             if v.state() == _BufferBlock.WRITABLE:
559                 v.owner.flush(False)
560
561         with self.lock:
562             if self._put_queue is not None:
563                 self._put_queue.join()
564
565                 if not self._put_errors.empty():
566                     err = []
567                     try:
568                         while True:
569                             err.append(self._put_errors.get(False))
570                     except Queue.Empty:
571                         pass
572                     raise KeepWriteError("Error writing some blocks", err, label="block")
573
574         for k,v in items:
575             # flush again with wait=True to remove committed bufferblocks from
576             # the segments.
577             v.owner.flush(True)
578
579
580     def block_prefetch(self, locator):
581         """Initiate a background download of a block.
582
583         This assumes that the underlying KeepClient implements a block cache,
584         so repeated requests for the same block will not result in repeated
585         downloads (unless the block is evicted from the cache.)  This method
586         does not block.
587
588         """
589
590         if not self.prefetch_enabled:
591             return
592
593         def block_prefetch_worker(self):
594             """The background downloader thread."""
595             while True:
596                 try:
597                     b = self._prefetch_queue.get()
598                     if b is None:
599                         return
600                     self._keep.get(b)
601                 except Exception:
602                     pass
603
604         with self.lock:
605             if locator in self._bufferblocks:
606                 return
607             if self._prefetch_threads is None:
608                 self._prefetch_queue = Queue.Queue()
609                 self._prefetch_threads = []
610                 for i in xrange(0, self.num_get_threads):
611                     thread = threading.Thread(target=block_prefetch_worker, args=(self,))
612                     self._prefetch_threads.append(thread)
613                     thread.daemon = True
614                     thread.start()
615         self._prefetch_queue.put(locator)
616
617
618 class ArvadosFile(object):
619     """Represent a file in a Collection.
620
621     ArvadosFile manages the underlying representation of a file in Keep as a
622     sequence of segments spanning a set of blocks, and implements random
623     read/write access.
624
625     This object may be accessed from multiple threads.
626
627     """
628
629     def __init__(self, parent, name, stream=[], segments=[]):
630         """
631         ArvadosFile constructor.
632
633         :stream:
634           a list of Range objects representing a block stream
635
636         :segments:
637           a list of Range objects representing segments
638         """
639         self.parent = parent
640         self.name = name
641         self._modified = True
642         self._segments = []
643         self.lock = parent.root_collection().lock
644         for s in segments:
645             self._add_segment(stream, s.locator, s.range_size)
646         self._current_bblock = None
647
648     def writable(self):
649         return self.parent.writable()
650
651     @synchronized
652     def segments(self):
653         return copy.copy(self._segments)
654
655     @synchronized
656     def clone(self, new_parent, new_name):
657         """Make a copy of this file."""
658         cp = ArvadosFile(new_parent, new_name)
659         cp.replace_contents(self)
660         return cp
661
662     @must_be_writable
663     @synchronized
664     def replace_contents(self, other):
665         """Replace segments of this file with segments from another `ArvadosFile` object."""
666
667         map_loc = {}
668         self._segments = []
669         for other_segment in other.segments():
670             new_loc = other_segment.locator
671             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
672                 if other_segment.locator not in map_loc:
673                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
674                     if bufferblock.state() != _BufferBlock.WRITABLE:
675                         map_loc[other_segment.locator] = bufferblock.locator()
676                     else:
677                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
678                 new_loc = map_loc[other_segment.locator]
679
680             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
681
682         self._modified = True
683
684     def __eq__(self, other):
685         if other is self:
686             return True
687         if not isinstance(other, ArvadosFile):
688             return False
689
690         othersegs = other.segments()
691         with self.lock:
692             if len(self._segments) != len(othersegs):
693                 return False
694             for i in xrange(0, len(othersegs)):
695                 seg1 = self._segments[i]
696                 seg2 = othersegs[i]
697                 loc1 = seg1.locator
698                 loc2 = seg2.locator
699
700                 if self.parent._my_block_manager().is_bufferblock(loc1):
701                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
702
703                 if other.parent._my_block_manager().is_bufferblock(loc2):
704                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
705
706                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
707                     seg1.range_start != seg2.range_start or
708                     seg1.range_size != seg2.range_size or
709                     seg1.segment_offset != seg2.segment_offset):
710                     return False
711
712         return True
713
714     def __ne__(self, other):
715         return not self.__eq__(other)
716
717     @synchronized
718     def set_unmodified(self):
719         """Clear the modified flag"""
720         self._modified = False
721
722     @synchronized
723     def modified(self):
724         """Test the modified flag"""
725         return self._modified
726
727     @must_be_writable
728     @synchronized
729     def truncate(self, size):
730         """Shrink the size of the file.
731
732         If `size` is less than the size of the file, the file contents after
733         `size` will be discarded.  If `size` is greater than the current size
734         of the file, an IOError will be raised.
735
736         """
737         if size < self.size():
738             new_segs = []
739             for r in self._segments:
740                 range_end = r.range_start+r.range_size
741                 if r.range_start >= size:
742                     # segment is past the trucate size, all done
743                     break
744                 elif size < range_end:
745                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
746                     nr.segment_offset = r.segment_offset
747                     new_segs.append(nr)
748                     break
749                 else:
750                     new_segs.append(r)
751
752             self._segments = new_segs
753             self._modified = True
754         elif size > self.size():
755             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
756
757     def readfrom(self, offset, size, num_retries, exact=False):
758         """Read up to `size` bytes from the file starting at `offset`.
759
760         :exact:
761          If False (default), return less data than requested if the read
762          crosses a block boundary and the next block isn't cached.  If True,
763          only return less data than requested when hitting EOF.
764         """
765
766         with self.lock:
767             if size == 0 or offset >= self.size():
768                 return ''
769             prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
770             readsegs = locators_and_ranges(self._segments, offset, size)
771
772         for lr in prefetch:
773             self.parent._my_block_manager().block_prefetch(lr.locator)
774
775         data = []
776         for lr in readsegs:
777             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
778             if block:
779                 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
780             else:
781                 break
782         return ''.join(data)
783
784     def _repack_writes(self, num_retries):
785         """Test if the buffer block has more data than actual segments.
786
787         This happens when a buffered write over-writes a file range written in
788         a previous buffered write.  Re-pack the buffer block for efficiency
789         and to avoid leaking information.
790
791         """
792         segs = self._segments
793
794         # Sum up the segments to get the total bytes of the file referencing
795         # into the buffer block.
796         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
797         write_total = sum([s.range_size for s in bufferblock_segs])
798
799         if write_total < self._current_bblock.size():
800             # There is more data in the buffer block than is actually accounted for by segments, so
801             # re-pack into a new buffer by copying over to a new buffer block.
802             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
803             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
804             for t in bufferblock_segs:
805                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
806                 t.segment_offset = new_bb.size() - t.range_size
807
808             self._current_bblock = new_bb
809
810     @must_be_writable
811     @synchronized
812     def writeto(self, offset, data, num_retries):
813         """Write `data` to the file starting at `offset`.
814
815         This will update existing bytes and/or extend the size of the file as
816         necessary.
817
818         """
819         if len(data) == 0:
820             return
821
822         if offset > self.size():
823             raise ArgumentError("Offset is past the end of the file")
824
825         if len(data) > config.KEEP_BLOCK_SIZE:
826             raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
827
828         self._modified = True
829
830         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
831             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
832
833         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
834             self._repack_writes(num_retries)
835             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
836                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, False)
837                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
838
839         self._current_bblock.append(data)
840
841         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
842
843         self.parent.notify(MOD, self.parent, self.name, (self, self))
844
845         return len(data)
846
847     @synchronized
848     def flush(self, wait=True, num_retries=0):
849         """Flush bufferblocks to Keep."""
850         if self.modified():
851             if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
852                 self._repack_writes(num_retries)
853                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, wait)
854             if wait:
855                 to_delete = set()
856                 for s in self._segments:
857                     bb = self.parent._my_block_manager().get_bufferblock(s.locator)
858                     if bb:
859                         if bb.state() != _BufferBlock.COMMITTED:
860                             _logger.error("bufferblock %s is not committed" % (s.locator))
861                         else:
862                             to_delete.add(s.locator)
863                             s.locator = bb.locator()
864                 for s in to_delete:
865                    self.parent._my_block_manager().delete_bufferblock(s)
866
867             self.parent.notify(MOD, self.parent, self.name, (self, self))
868
869     @must_be_writable
870     @synchronized
871     def add_segment(self, blocks, pos, size):
872         """Add a segment to the end of the file.
873
874         `pos` and `offset` reference a section of the stream described by
875         `blocks` (a list of Range objects)
876
877         """
878         self._add_segment(blocks, pos, size)
879
880     def _add_segment(self, blocks, pos, size):
881         """Internal implementation of add_segment."""
882         self._modified = True
883         for lr in locators_and_ranges(blocks, pos, size):
884             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
885             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
886             self._segments.append(r)
887
888     @synchronized
889     def size(self):
890         """Get the file size."""
891         if self._segments:
892             n = self._segments[-1]
893             return n.range_start + n.range_size
894         else:
895             return 0
896
897     @synchronized
898     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
899         buf = ""
900         filestream = []
901         for segment in self.segments:
902             loc = segment.locator
903             if loc.startswith("bufferblock"):
904                 loc = self._bufferblocks[loc].calculate_locator()
905             if portable_locators:
906                 loc = KeepLocator(loc).stripped()
907             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
908                                  segment.segment_offset, segment.range_size))
909         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
910         buf += "\n"
911         return buf
912
913     @must_be_writable
914     @synchronized
915     def _reparent(self, newparent, newname):
916         self._modified = True
917         self.flush()
918         self.parent.remove(self.name)
919         self.parent = newparent
920         self.name = newname
921         self.lock = self.parent.root_collection().lock
922
923
924 class ArvadosFileReader(ArvadosFileReaderBase):
925     """Wraps ArvadosFile in a file-like object supporting reading only.
926
927     Be aware that this class is NOT thread safe as there is no locking around
928     updating file pointer.
929
930     """
931
932     def __init__(self, arvadosfile,  mode="r", num_retries=None):
933         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode, num_retries=num_retries)
934         self.arvadosfile = arvadosfile
935
936     def size(self):
937         return self.arvadosfile.size()
938
939     def stream_name(self):
940         return self.arvadosfile.parent.stream_name()
941
942     @_FileLikeObjectBase._before_close
943     @retry_method
944     def read(self, size=None, num_retries=None):
945         """Read up to `size` bytes from the file and return the result.
946
947         Starts at the current file position.  If `size` is None, read the
948         entire remainder of the file.
949         """
950         if size is None:
951             data = []
952             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
953             while rd:
954                 data.append(rd)
955                 self._filepos += len(rd)
956                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
957             return ''.join(data)
958         else:
959             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
960             self._filepos += len(data)
961             return data
962
963     @_FileLikeObjectBase._before_close
964     @retry_method
965     def readfrom(self, offset, size, num_retries=None):
966         """Read up to `size` bytes from the stream, starting at the specified file offset.
967
968         This method does not change the file position.
969         """
970         return self.arvadosfile.readfrom(offset, size, num_retries)
971
972     def flush(self):
973         pass
974
975
976 class ArvadosFileWriter(ArvadosFileReader):
977     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
978
979     Be aware that this class is NOT thread safe as there is no locking around
980     updating file pointer.
981
982     """
983
984     def __init__(self, arvadosfile, mode, num_retries=None):
985         super(ArvadosFileWriter, self).__init__(arvadosfile, mode, num_retries=num_retries)
986
987     @_FileLikeObjectBase._before_close
988     @retry_method
989     def write(self, data, num_retries=None):
990         if self.mode[0] == "a":
991             self.arvadosfile.writeto(self.size(), data, num_retries)
992         else:
993             self.arvadosfile.writeto(self._filepos, data, num_retries)
994             self._filepos += len(data)
995         return len(data)
996
997     @_FileLikeObjectBase._before_close
998     @retry_method
999     def writelines(self, seq, num_retries=None):
1000         for s in seq:
1001             self.write(s, num_retries)
1002
1003     @_FileLikeObjectBase._before_close
1004     def truncate(self, size=None):
1005         if size is None:
1006             size = self._filepos
1007         self.arvadosfile.truncate(size)
1008         if self._filepos > self.size():
1009             self._filepos = self.size()
1010
1011     @_FileLikeObjectBase._before_close
1012     def flush(self):
1013         self.arvadosfile.flush()
1014
1015     def close(self):
1016         if not self.closed:
1017             self.flush()
1018             super(ArvadosFileWriter, self).close()