3198: Add docstring with overview of the architecture. Implement check_update
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13
14 from .errors import KeepWriteError, AssertionError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
19
20 MOD = "mod"
21 WRITE = "write"
22
23 _logger = logging.getLogger('arvados.arvfile')
24
25 def split(path):
26     """split(path) -> streamname, filename
27
28     Separate the stream name and file name in a /-separated stream path and
29     return a tuple (stream_name, file_name).  If no stream name is available,
30     assume '.'.
31
32     """
33     try:
34         stream_name, file_name = path.rsplit('/', 1)
35     except ValueError:  # No / in string
36         stream_name, file_name = '.', path
37     return stream_name, file_name
38
39 class _FileLikeObjectBase(object):
40     def __init__(self, name, mode):
41         self.name = name
42         self.mode = mode
43         self.closed = False
44
45     @staticmethod
46     def _before_close(orig_func):
47         @functools.wraps(orig_func)
48         def before_close_wrapper(self, *args, **kwargs):
49             if self.closed:
50                 raise ValueError("I/O operation on closed stream file")
51             return orig_func(self, *args, **kwargs)
52         return before_close_wrapper
53
54     def __enter__(self):
55         return self
56
57     def __exit__(self, exc_type, exc_value, traceback):
58         try:
59             self.close()
60         except Exception:
61             if exc_type is None:
62                 raise
63
64     def close(self):
65         self.closed = True
66
67
68 class ArvadosFileReaderBase(_FileLikeObjectBase):
69     def __init__(self, name, mode, num_retries=None):
70         super(ArvadosFileReaderBase, self).__init__(name, mode)
71         self._filepos = 0L
72         self.num_retries = num_retries
73         self._readline_cache = (None, None)
74
75     def __iter__(self):
76         while True:
77             data = self.readline()
78             if not data:
79                 break
80             yield data
81
82     def decompressed_name(self):
83         return re.sub('\.(bz2|gz)$', '', self.name)
84
85     @_FileLikeObjectBase._before_close
86     def seek(self, pos, whence=os.SEEK_SET):
87         if whence == os.SEEK_CUR:
88             pos += self._filepos
89         elif whence == os.SEEK_END:
90             pos += self.size()
91         self._filepos = min(max(pos, 0L), self.size())
92
93     def tell(self):
94         return self._filepos
95
96     @_FileLikeObjectBase._before_close
97     @retry_method
98     def readall(self, size=2**20, num_retries=None):
99         while True:
100             data = self.read(size, num_retries=num_retries)
101             if data == '':
102                 break
103             yield data
104
105     @_FileLikeObjectBase._before_close
106     @retry_method
107     def readline(self, size=float('inf'), num_retries=None):
108         cache_pos, cache_data = self._readline_cache
109         if self.tell() == cache_pos:
110             data = [cache_data]
111         else:
112             data = ['']
113         data_size = len(data[-1])
114         while (data_size < size) and ('\n' not in data[-1]):
115             next_read = self.read(2 ** 20, num_retries=num_retries)
116             if not next_read:
117                 break
118             data.append(next_read)
119             data_size += len(next_read)
120         data = ''.join(data)
121         try:
122             nextline_index = data.index('\n') + 1
123         except ValueError:
124             nextline_index = len(data)
125         nextline_index = min(nextline_index, size)
126         self._readline_cache = (self.tell(), data[nextline_index:])
127         return data[:nextline_index]
128
129     @_FileLikeObjectBase._before_close
130     @retry_method
131     def decompress(self, decompress, size, num_retries=None):
132         for segment in self.readall(size, num_retries):
133             data = decompress(segment)
134             if data:
135                 yield data
136
137     @_FileLikeObjectBase._before_close
138     @retry_method
139     def readall_decompressed(self, size=2**20, num_retries=None):
140         self.seek(0)
141         if self.name.endswith('.bz2'):
142             dc = bz2.BZ2Decompressor()
143             return self.decompress(dc.decompress, size,
144                                    num_retries=num_retries)
145         elif self.name.endswith('.gz'):
146             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
147             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
148                                    size, num_retries=num_retries)
149         else:
150             return self.readall(size, num_retries=num_retries)
151
152     @_FileLikeObjectBase._before_close
153     @retry_method
154     def readlines(self, sizehint=float('inf'), num_retries=None):
155         data = []
156         data_size = 0
157         for s in self.readall(num_retries=num_retries):
158             data.append(s)
159             data_size += len(s)
160             if data_size >= sizehint:
161                 break
162         return ''.join(data).splitlines(True)
163
164     def size(self):
165         raise NotImplementedError()
166
167     def read(self, size, num_retries=None):
168         raise NotImplementedError()
169
170     def readfrom(self, start, size, num_retries=None):
171         raise NotImplementedError()
172
173
174 class StreamFileReader(ArvadosFileReaderBase):
175     class _NameAttribute(str):
176         # The Python file API provides a plain .name attribute.
177         # Older SDK provided a name() method.
178         # This class provides both, for maximum compatibility.
179         def __call__(self):
180             return self
181
182     def __init__(self, stream, segments, name):
183         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
184         self._stream = stream
185         self.segments = segments
186
187     def stream_name(self):
188         return self._stream.name()
189
190     def size(self):
191         n = self.segments[-1]
192         return n.range_start + n.range_size
193
194     @_FileLikeObjectBase._before_close
195     @retry_method
196     def read(self, size, num_retries=None):
197         """Read up to 'size' bytes from the stream, starting at the current file position"""
198         if size == 0:
199             return ''
200
201         data = ''
202         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
203         if available_chunks:
204             lr = available_chunks[0]
205             data = self._stream.readfrom(lr.locator+lr.segment_offset,
206                                           lr.segment_size,
207                                           num_retries=num_retries)
208
209         self._filepos += len(data)
210         return data
211
212     @_FileLikeObjectBase._before_close
213     @retry_method
214     def readfrom(self, start, size, num_retries=None):
215         """Read up to 'size' bytes from the stream, starting at 'start'"""
216         if size == 0:
217             return ''
218
219         data = []
220         for lr in locators_and_ranges(self.segments, start, size):
221             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
222                                               num_retries=num_retries))
223         return ''.join(data)
224
225     def as_manifest(self):
226         segs = []
227         for r in self.segments:
228             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
229         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
230
231
232 def synchronized(orig_func):
233     @functools.wraps(orig_func)
234     def synchronized_wrapper(self, *args, **kwargs):
235         with self.lock:
236             return orig_func(self, *args, **kwargs)
237     return synchronized_wrapper
238
239 class _BufferBlock(object):
240     """A stand-in for a Keep block that is in the process of being written.
241
242     Writers can append to it, get the size, and compute the Keep locator.
243     There are three valid states:
244
245     WRITABLE
246       Can append to block.
247
248     PENDING
249       Block is in the process of being uploaded to Keep, append is an error.
250
251     COMMITTED
252       The block has been written to Keep, its internal buffer has been
253       released, fetching the block will fetch it via keep client (since we
254       discarded the internal copy), and identifiers referring to the BufferBlock
255       can be replaced with the block locator.
256
257     """
258
259     WRITABLE = 0
260     PENDING = 1
261     COMMITTED = 2
262
263     def __init__(self, blockid, starting_capacity, owner):
264         """
265         :blockid:
266           the identifier for this block
267
268         :starting_capacity:
269           the initial buffer capacity
270
271         :owner:
272           ArvadosFile that owns this block
273
274         """
275         self.blockid = blockid
276         self.buffer_block = bytearray(starting_capacity)
277         self.buffer_view = memoryview(self.buffer_block)
278         self.write_pointer = 0
279         self._state = _BufferBlock.WRITABLE
280         self._locator = None
281         self.owner = owner
282         self.lock = threading.Lock()
283
284     @synchronized
285     def append(self, data):
286         """Append some data to the buffer.
287
288         Only valid if the block is in WRITABLE state.  Implements an expanding
289         buffer, doubling capacity as needed to accomdate all the data.
290
291         """
292         if self._state == _BufferBlock.WRITABLE:
293             while (self.write_pointer+len(data)) > len(self.buffer_block):
294                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
295                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
296                 self.buffer_block = new_buffer_block
297                 self.buffer_view = memoryview(self.buffer_block)
298             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
299             self.write_pointer += len(data)
300             self._locator = None
301         else:
302             raise AssertionError("Buffer block is not writable")
303
304     @synchronized
305     def set_state(self, nextstate, loc=None):
306         if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
307             (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
308             self._state = nextstate
309             if self._state == _BufferBlock.COMMITTED:
310                 self._locator = loc
311                 self.buffer_view = None
312                 self.buffer_block = None
313         else:
314             raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate))
315
316     @synchronized
317     def state(self):
318         return self._state
319
320     def size(self):
321         """The amount of data written to the buffer."""
322         return self.write_pointer
323
324     @synchronized
325     def locator(self):
326         """The Keep locator for this buffer's contents."""
327         if self._locator is None:
328             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
329         return self._locator
330
331     @synchronized
332     def clone(self, new_blockid, owner):
333         if self._state == _BufferBlock.COMMITTED:
334             raise AssertionError("Can only duplicate a writable or pending buffer block")
335         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
336         bufferblock.append(self.buffer_view[0:self.size()])
337         return bufferblock
338
339     @synchronized
340     def clear(self):
341         self.owner = None
342         self.buffer_block = None
343         self.buffer_view = None
344
345
346 class NoopLock(object):
347     def __enter__(self):
348         return self
349
350     def __exit__(self, exc_type, exc_value, traceback):
351         pass
352
353     def acquire(self, blocking=False):
354         pass
355
356     def release(self):
357         pass
358
359
360 def must_be_writable(orig_func):
361     @functools.wraps(orig_func)
362     def must_be_writable_wrapper(self, *args, **kwargs):
363         if not self.writable():
364             raise IOError(errno.EROFS, "Collection must be writable.")
365         return orig_func(self, *args, **kwargs)
366     return must_be_writable_wrapper
367
368
369 class _BlockManager(object):
370     """BlockManager handles buffer blocks.
371
372     Also handles background block uploads, and background block prefetch for a
373     Collection of ArvadosFiles.
374
375     """
376     def __init__(self, keep):
377         """keep: KeepClient object to use"""
378         self._keep = keep
379         self._bufferblocks = {}
380         self._put_queue = None
381         self._put_errors = None
382         self._put_threads = None
383         self._prefetch_queue = None
384         self._prefetch_threads = None
385         self.lock = threading.Lock()
386         self.prefetch_enabled = True
387         self.num_put_threads = 2
388         self.num_get_threads = 2
389
390     @synchronized
391     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
392         """Allocate a new, empty bufferblock in WRITABLE state and return it.
393
394         :blockid:
395           optional block identifier, otherwise one will be automatically assigned
396
397         :starting_capacity:
398           optional capacity, otherwise will use default capacity
399
400         :owner:
401           ArvadosFile that owns this block
402
403         """
404         if blockid is None:
405             blockid = "bufferblock%i" % len(self._bufferblocks)
406         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
407         self._bufferblocks[bufferblock.blockid] = bufferblock
408         return bufferblock
409
410     @synchronized
411     def dup_block(self, block, owner):
412         """Create a new bufferblock initialized with the content of an existing bufferblock.
413
414         :block:
415           the buffer block to copy.
416
417         :owner:
418           ArvadosFile that owns the new block
419
420         """
421         new_blockid = "bufferblock%i" % len(self._bufferblocks)
422         bufferblock = block.clone(new_blockid, owner)
423         self._bufferblocks[bufferblock.blockid] = bufferblock
424         return bufferblock
425
426     @synchronized
427     def is_bufferblock(self, locator):
428         return locator in self._bufferblocks
429
430     @synchronized
431     def stop_threads(self):
432         """Shut down and wait for background upload and download threads to finish."""
433
434         if self._put_threads is not None:
435             for t in self._put_threads:
436                 self._put_queue.put(None)
437             for t in self._put_threads:
438                 t.join()
439         self._put_threads = None
440         self._put_queue = None
441         self._put_errors = None
442
443         if self._prefetch_threads is not None:
444             for t in self._prefetch_threads:
445                 self._prefetch_queue.put(None)
446             for t in self._prefetch_threads:
447                 t.join()
448         self._prefetch_threads = None
449         self._prefetch_queue = None
450
451     def commit_bufferblock(self, block, wait):
452         """Initiate a background upload of a bufferblock.
453
454         :block:
455           The block object to upload
456
457         :wait:
458           If `wait` is True, upload the block synchronously.
459           If `wait` is False, upload the block asynchronously.  This will
460           return immediately unless if the upload queue is at capacity, in
461           which case it will wait on an upload queue slot.
462
463         """
464
465         def commit_bufferblock_worker(self):
466             """Background uploader thread."""
467
468             while True:
469                 try:
470                     bufferblock = self._put_queue.get()
471                     if bufferblock is None:
472                         return
473
474                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
475                     bufferblock.set_state(_BufferBlock.COMMITTED, loc)
476
477                 except Exception as e:
478                     self._put_errors.put((bufferblock.locator(), e))
479                 finally:
480                     if self._put_queue is not None:
481                         self._put_queue.task_done()
482
483         if block.state() != _BufferBlock.WRITABLE:
484             return
485
486         if wait:
487             block.set_state(_BufferBlock.PENDING)
488             loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
489             block.set_state(_BufferBlock.COMMITTED, loc)
490         else:
491             with self.lock:
492                 if self._put_threads is None:
493                     # Start uploader threads.
494
495                     # If we don't limit the Queue size, the upload queue can quickly
496                     # grow to take up gigabytes of RAM if the writing process is
497                     # generating data more quickly than it can be send to the Keep
498                     # servers.
499                     #
500                     # With two upload threads and a queue size of 2, this means up to 4
501                     # blocks pending.  If they are full 64 MiB blocks, that means up to
502                     # 256 MiB of internal buffering, which is the same size as the
503                     # default download block cache in KeepClient.
504                     self._put_queue = Queue.Queue(maxsize=2)
505                     self._put_errors = Queue.Queue()
506
507                     self._put_threads = []
508                     for i in xrange(0, self.num_put_threads):
509                         thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
510                         self._put_threads.append(thread)
511                         thread.daemon = True
512                         thread.start()
513
514             # Mark the block as PENDING so to disallow any more appends.
515             block.set_state(_BufferBlock.PENDING)
516             self._put_queue.put(block)
517
518     @synchronized
519     def get_bufferblock(self, locator):
520         return self._bufferblocks.get(locator)
521
522     @synchronized
523     def delete_bufferblock(self, locator):
524         bb = self._bufferblocks[locator]
525         bb.clear()
526         del self._bufferblocks[locator]
527
528     def get_block_contents(self, locator, num_retries, cache_only=False):
529         """Fetch a block.
530
531         First checks to see if the locator is a BufferBlock and return that, if
532         not, passes the request through to KeepClient.get().
533
534         """
535         with self.lock:
536             if locator in self._bufferblocks:
537                 bufferblock = self._bufferblocks[locator]
538                 if bufferblock.state() != _BufferBlock.COMMITTED:
539                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
540                 else:
541                     locator = bufferblock._locator
542         if cache_only:
543             return self._keep.get_from_cache(locator)
544         else:
545             return self._keep.get(locator, num_retries=num_retries)
546
547     def commit_all(self):
548         """Commit all outstanding buffer blocks.
549
550         Unlike commit_bufferblock(), this is a synchronous call, and will not
551         return until all buffer blocks are uploaded.  Raises
552         KeepWriteError() if any blocks failed to upload.
553
554         """
555         with self.lock:
556             items = self._bufferblocks.items()
557
558         for k,v in items:
559             if v.state() == _BufferBlock.WRITABLE:
560                 v.owner.flush(False)
561
562         with self.lock:
563             if self._put_queue is not None:
564                 self._put_queue.join()
565
566                 if not self._put_errors.empty():
567                     err = []
568                     try:
569                         while True:
570                             err.append(self._put_errors.get(False))
571                     except Queue.Empty:
572                         pass
573                     raise KeepWriteError("Error writing some blocks", err, label="block")
574
575         for k,v in items:
576             # flush again with wait=True to remove committed bufferblocks from
577             # the segments.
578             if v.owner:
579                 v.owner.flush(True)
580
581
582     def block_prefetch(self, locator):
583         """Initiate a background download of a block.
584
585         This assumes that the underlying KeepClient implements a block cache,
586         so repeated requests for the same block will not result in repeated
587         downloads (unless the block is evicted from the cache.)  This method
588         does not block.
589
590         """
591
592         if not self.prefetch_enabled:
593             return
594
595         def block_prefetch_worker(self):
596             """The background downloader thread."""
597             while True:
598                 try:
599                     b = self._prefetch_queue.get()
600                     if b is None:
601                         return
602                     self._keep.get(b)
603                 except Exception:
604                     pass
605
606         with self.lock:
607             if locator in self._bufferblocks:
608                 return
609             if self._prefetch_threads is None:
610                 self._prefetch_queue = Queue.Queue()
611                 self._prefetch_threads = []
612                 for i in xrange(0, self.num_get_threads):
613                     thread = threading.Thread(target=block_prefetch_worker, args=(self,))
614                     self._prefetch_threads.append(thread)
615                     thread.daemon = True
616                     thread.start()
617         self._prefetch_queue.put(locator)
618
619
620 class ArvadosFile(object):
621     """Represent a file in a Collection.
622
623     ArvadosFile manages the underlying representation of a file in Keep as a
624     sequence of segments spanning a set of blocks, and implements random
625     read/write access.
626
627     This object may be accessed from multiple threads.
628
629     """
630
631     def __init__(self, parent, name, stream=[], segments=[]):
632         """
633         ArvadosFile constructor.
634
635         :stream:
636           a list of Range objects representing a block stream
637
638         :segments:
639           a list of Range objects representing segments
640         """
641         self.parent = parent
642         self.name = name
643         self._modified = True
644         self._segments = []
645         self.lock = parent.root_collection().lock
646         for s in segments:
647             self._add_segment(stream, s.locator, s.range_size)
648         self._current_bblock = None
649
650     def writable(self):
651         return self.parent.writable()
652
653     @synchronized
654     def segments(self):
655         return copy.copy(self._segments)
656
657     @synchronized
658     def clone(self, new_parent, new_name):
659         """Make a copy of this file."""
660         cp = ArvadosFile(new_parent, new_name)
661         cp.replace_contents(self)
662         return cp
663
664     @must_be_writable
665     @synchronized
666     def replace_contents(self, other):
667         """Replace segments of this file with segments from another `ArvadosFile` object."""
668
669         map_loc = {}
670         self._segments = []
671         for other_segment in other.segments():
672             new_loc = other_segment.locator
673             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
674                 if other_segment.locator not in map_loc:
675                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
676                     if bufferblock.state() != _BufferBlock.WRITABLE:
677                         map_loc[other_segment.locator] = bufferblock.locator()
678                     else:
679                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
680                 new_loc = map_loc[other_segment.locator]
681
682             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
683
684         self._modified = True
685
686     def __eq__(self, other):
687         if other is self:
688             return True
689         if not isinstance(other, ArvadosFile):
690             return False
691
692         othersegs = other.segments()
693         with self.lock:
694             if len(self._segments) != len(othersegs):
695                 return False
696             for i in xrange(0, len(othersegs)):
697                 seg1 = self._segments[i]
698                 seg2 = othersegs[i]
699                 loc1 = seg1.locator
700                 loc2 = seg2.locator
701
702                 if self.parent._my_block_manager().is_bufferblock(loc1):
703                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
704
705                 if other.parent._my_block_manager().is_bufferblock(loc2):
706                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
707
708                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
709                     seg1.range_start != seg2.range_start or
710                     seg1.range_size != seg2.range_size or
711                     seg1.segment_offset != seg2.segment_offset):
712                     return False
713
714         return True
715
716     def __ne__(self, other):
717         return not self.__eq__(other)
718
719     @synchronized
720     def set_unmodified(self):
721         """Clear the modified flag"""
722         self._modified = False
723
724     @synchronized
725     def modified(self):
726         """Test the modified flag"""
727         return self._modified
728
729     @must_be_writable
730     @synchronized
731     def truncate(self, size):
732         """Shrink the size of the file.
733
734         If `size` is less than the size of the file, the file contents after
735         `size` will be discarded.  If `size` is greater than the current size
736         of the file, an IOError will be raised.
737
738         """
739         if size < self.size():
740             new_segs = []
741             for r in self._segments:
742                 range_end = r.range_start+r.range_size
743                 if r.range_start >= size:
744                     # segment is past the trucate size, all done
745                     break
746                 elif size < range_end:
747                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
748                     nr.segment_offset = r.segment_offset
749                     new_segs.append(nr)
750                     break
751                 else:
752                     new_segs.append(r)
753
754             self._segments = new_segs
755             self._modified = True
756         elif size > self.size():
757             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
758
759     def readfrom(self, offset, size, num_retries, exact=False):
760         """Read up to `size` bytes from the file starting at `offset`.
761
762         :exact:
763          If False (default), return less data than requested if the read
764          crosses a block boundary and the next block isn't cached.  If True,
765          only return less data than requested when hitting EOF.
766         """
767
768         with self.lock:
769             if size == 0 or offset >= self.size():
770                 return ''
771             prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
772             readsegs = locators_and_ranges(self._segments, offset, size)
773
774         for lr in prefetch:
775             self.parent._my_block_manager().block_prefetch(lr.locator)
776
777         data = []
778         for lr in readsegs:
779             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
780             if block:
781                 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
782             else:
783                 break
784         return ''.join(data)
785
786     def _repack_writes(self, num_retries):
787         """Test if the buffer block has more data than actual segments.
788
789         This happens when a buffered write over-writes a file range written in
790         a previous buffered write.  Re-pack the buffer block for efficiency
791         and to avoid leaking information.
792
793         """
794         segs = self._segments
795
796         # Sum up the segments to get the total bytes of the file referencing
797         # into the buffer block.
798         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
799         write_total = sum([s.range_size for s in bufferblock_segs])
800
801         if write_total < self._current_bblock.size():
802             # There is more data in the buffer block than is actually accounted for by segments, so
803             # re-pack into a new buffer by copying over to a new buffer block.
804             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
805             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
806             for t in bufferblock_segs:
807                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
808                 t.segment_offset = new_bb.size() - t.range_size
809
810             self._current_bblock = new_bb
811
812     @must_be_writable
813     @synchronized
814     def writeto(self, offset, data, num_retries):
815         """Write `data` to the file starting at `offset`.
816
817         This will update existing bytes and/or extend the size of the file as
818         necessary.
819
820         """
821         if len(data) == 0:
822             return
823
824         if offset > self.size():
825             raise ArgumentError("Offset is past the end of the file")
826
827         if len(data) > config.KEEP_BLOCK_SIZE:
828             raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
829
830         self._modified = True
831
832         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
833             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
834
835         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
836             self._repack_writes(num_retries)
837             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
838                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, False)
839                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
840
841         self._current_bblock.append(data)
842
843         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
844
845         self.parent.notify(WRITE, self.parent, self.name, (self, self))
846
847         return len(data)
848
849     @synchronized
850     def flush(self, wait=True, num_retries=0):
851         """Flush bufferblocks to Keep."""
852         if self.modified():
853             if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
854                 self._repack_writes(num_retries)
855                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, wait)
856             if wait:
857                 to_delete = set()
858                 for s in self._segments:
859                     bb = self.parent._my_block_manager().get_bufferblock(s.locator)
860                     if bb:
861                         if bb.state() != _BufferBlock.COMMITTED:
862                             _logger.error("bufferblock %s is not committed" % (s.locator))
863                         else:
864                             to_delete.add(s.locator)
865                             s.locator = bb.locator()
866                 for s in to_delete:
867                    self.parent._my_block_manager().delete_bufferblock(s)
868
869             self.parent.notify(MOD, self.parent, self.name, (self, self))
870
871     @must_be_writable
872     @synchronized
873     def add_segment(self, blocks, pos, size):
874         """Add a segment to the end of the file.
875
876         `pos` and `offset` reference a section of the stream described by
877         `blocks` (a list of Range objects)
878
879         """
880         self._add_segment(blocks, pos, size)
881
882     def _add_segment(self, blocks, pos, size):
883         """Internal implementation of add_segment."""
884         self._modified = True
885         for lr in locators_and_ranges(blocks, pos, size):
886             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
887             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
888             self._segments.append(r)
889
890     @synchronized
891     def size(self):
892         """Get the file size."""
893         if self._segments:
894             n = self._segments[-1]
895             return n.range_start + n.range_size
896         else:
897             return 0
898
899     @synchronized
900     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
901         buf = ""
902         filestream = []
903         for segment in self.segments:
904             loc = segment.locator
905             if loc.startswith("bufferblock"):
906                 loc = self._bufferblocks[loc].calculate_locator()
907             if portable_locators:
908                 loc = KeepLocator(loc).stripped()
909             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
910                                  segment.segment_offset, segment.range_size))
911         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
912         buf += "\n"
913         return buf
914
915     @must_be_writable
916     @synchronized
917     def _reparent(self, newparent, newname):
918         self._modified = True
919         self.flush()
920         self.parent.remove(self.name)
921         self.parent = newparent
922         self.name = newname
923         self.lock = self.parent.root_collection().lock
924
925
926 class ArvadosFileReader(ArvadosFileReaderBase):
927     """Wraps ArvadosFile in a file-like object supporting reading only.
928
929     Be aware that this class is NOT thread safe as there is no locking around
930     updating file pointer.
931
932     """
933
934     def __init__(self, arvadosfile,  mode="r", num_retries=None):
935         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode, num_retries=num_retries)
936         self.arvadosfile = arvadosfile
937
938     def size(self):
939         return self.arvadosfile.size()
940
941     def stream_name(self):
942         return self.arvadosfile.parent.stream_name()
943
944     @_FileLikeObjectBase._before_close
945     @retry_method
946     def read(self, size=None, num_retries=None):
947         """Read up to `size` bytes from the file and return the result.
948
949         Starts at the current file position.  If `size` is None, read the
950         entire remainder of the file.
951         """
952         if size is None:
953             data = []
954             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
955             while rd:
956                 data.append(rd)
957                 self._filepos += len(rd)
958                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
959             return ''.join(data)
960         else:
961             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
962             self._filepos += len(data)
963             return data
964
965     @_FileLikeObjectBase._before_close
966     @retry_method
967     def readfrom(self, offset, size, num_retries=None):
968         """Read up to `size` bytes from the stream, starting at the specified file offset.
969
970         This method does not change the file position.
971         """
972         return self.arvadosfile.readfrom(offset, size, num_retries)
973
974     def flush(self):
975         pass
976
977
978 class ArvadosFileWriter(ArvadosFileReader):
979     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
980
981     Be aware that this class is NOT thread safe as there is no locking around
982     updating file pointer.
983
984     """
985
986     def __init__(self, arvadosfile, mode, num_retries=None):
987         super(ArvadosFileWriter, self).__init__(arvadosfile, mode, num_retries=num_retries)
988
989     @_FileLikeObjectBase._before_close
990     @retry_method
991     def write(self, data, num_retries=None):
992         if self.mode[0] == "a":
993             self.arvadosfile.writeto(self.size(), data, num_retries)
994         else:
995             self.arvadosfile.writeto(self._filepos, data, num_retries)
996             self._filepos += len(data)
997         return len(data)
998
999     @_FileLikeObjectBase._before_close
1000     @retry_method
1001     def writelines(self, seq, num_retries=None):
1002         for s in seq:
1003             self.write(s, num_retries)
1004
1005     @_FileLikeObjectBase._before_close
1006     def truncate(self, size=None):
1007         if size is None:
1008             size = self._filepos
1009         self.arvadosfile.truncate(size)
1010         if self._filepos > self.size():
1011             self._filepos = self.size()
1012
1013     @_FileLikeObjectBase._before_close
1014     def flush(self):
1015         self.arvadosfile.flush()
1016
1017     def close(self):
1018         if not self.closed:
1019             self.flush()
1020             super(ArvadosFileWriter, self).close()