14 from .errors import KeepWriteError, AssertionError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
23 _logger = logging.getLogger('arvados.arvfile')
26 """split(path) -> streamname, filename
28 Separate the stream name and file name in a /-separated stream path and
29 return a tuple (stream_name, file_name). If no stream name is available,
34 stream_name, file_name = path.rsplit('/', 1)
35 except ValueError: # No / in string
36 stream_name, file_name = '.', path
37 return stream_name, file_name
39 class _FileLikeObjectBase(object):
40 def __init__(self, name, mode):
46 def _before_close(orig_func):
47 @functools.wraps(orig_func)
48 def before_close_wrapper(self, *args, **kwargs):
50 raise ValueError("I/O operation on closed stream file")
51 return orig_func(self, *args, **kwargs)
52 return before_close_wrapper
57 def __exit__(self, exc_type, exc_value, traceback):
68 class ArvadosFileReaderBase(_FileLikeObjectBase):
69 def __init__(self, name, mode, num_retries=None):
70 super(ArvadosFileReaderBase, self).__init__(name, mode)
72 self.num_retries = num_retries
73 self._readline_cache = (None, None)
77 data = self.readline()
82 def decompressed_name(self):
83 return re.sub('\.(bz2|gz)$', '', self.name)
85 @_FileLikeObjectBase._before_close
86 def seek(self, pos, whence=os.SEEK_SET):
87 if whence == os.SEEK_CUR:
89 elif whence == os.SEEK_END:
91 self._filepos = min(max(pos, 0L), self.size())
96 @_FileLikeObjectBase._before_close
98 def readall(self, size=2**20, num_retries=None):
100 data = self.read(size, num_retries=num_retries)
105 @_FileLikeObjectBase._before_close
107 def readline(self, size=float('inf'), num_retries=None):
108 cache_pos, cache_data = self._readline_cache
109 if self.tell() == cache_pos:
113 data_size = len(data[-1])
114 while (data_size < size) and ('\n' not in data[-1]):
115 next_read = self.read(2 ** 20, num_retries=num_retries)
118 data.append(next_read)
119 data_size += len(next_read)
122 nextline_index = data.index('\n') + 1
124 nextline_index = len(data)
125 nextline_index = min(nextline_index, size)
126 self._readline_cache = (self.tell(), data[nextline_index:])
127 return data[:nextline_index]
129 @_FileLikeObjectBase._before_close
131 def decompress(self, decompress, size, num_retries=None):
132 for segment in self.readall(size, num_retries):
133 data = decompress(segment)
137 @_FileLikeObjectBase._before_close
139 def readall_decompressed(self, size=2**20, num_retries=None):
141 if self.name.endswith('.bz2'):
142 dc = bz2.BZ2Decompressor()
143 return self.decompress(dc.decompress, size,
144 num_retries=num_retries)
145 elif self.name.endswith('.gz'):
146 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
147 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
148 size, num_retries=num_retries)
150 return self.readall(size, num_retries=num_retries)
152 @_FileLikeObjectBase._before_close
154 def readlines(self, sizehint=float('inf'), num_retries=None):
157 for s in self.readall(num_retries=num_retries):
160 if data_size >= sizehint:
162 return ''.join(data).splitlines(True)
165 raise NotImplementedError()
167 def read(self, size, num_retries=None):
168 raise NotImplementedError()
170 def readfrom(self, start, size, num_retries=None):
171 raise NotImplementedError()
174 class StreamFileReader(ArvadosFileReaderBase):
175 class _NameAttribute(str):
176 # The Python file API provides a plain .name attribute.
177 # Older SDK provided a name() method.
178 # This class provides both, for maximum compatibility.
182 def __init__(self, stream, segments, name):
183 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
184 self._stream = stream
185 self.segments = segments
187 def stream_name(self):
188 return self._stream.name()
191 n = self.segments[-1]
192 return n.range_start + n.range_size
194 @_FileLikeObjectBase._before_close
196 def read(self, size, num_retries=None):
197 """Read up to 'size' bytes from the stream, starting at the current file position"""
202 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
204 lr = available_chunks[0]
205 data = self._stream.readfrom(lr.locator+lr.segment_offset,
207 num_retries=num_retries)
209 self._filepos += len(data)
212 @_FileLikeObjectBase._before_close
214 def readfrom(self, start, size, num_retries=None):
215 """Read up to 'size' bytes from the stream, starting at 'start'"""
220 for lr in locators_and_ranges(self.segments, start, size):
221 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
222 num_retries=num_retries))
225 def as_manifest(self):
227 for r in self.segments:
228 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
229 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
232 def synchronized(orig_func):
233 @functools.wraps(orig_func)
234 def synchronized_wrapper(self, *args, **kwargs):
236 return orig_func(self, *args, **kwargs)
237 return synchronized_wrapper
239 class _BufferBlock(object):
240 """A stand-in for a Keep block that is in the process of being written.
242 Writers can append to it, get the size, and compute the Keep locator.
243 There are three valid states:
249 Block is in the process of being uploaded to Keep, append is an error.
252 The block has been written to Keep, its internal buffer has been
253 released, fetching the block will fetch it via keep client (since we
254 discarded the internal copy), and identifiers referring to the BufferBlock
255 can be replaced with the block locator.
263 def __init__(self, blockid, starting_capacity, owner):
266 the identifier for this block
269 the initial buffer capacity
272 ArvadosFile that owns this block
275 self.blockid = blockid
276 self.buffer_block = bytearray(starting_capacity)
277 self.buffer_view = memoryview(self.buffer_block)
278 self.write_pointer = 0
279 self._state = _BufferBlock.WRITABLE
282 self.lock = threading.Lock()
285 def append(self, data):
286 """Append some data to the buffer.
288 Only valid if the block is in WRITABLE state. Implements an expanding
289 buffer, doubling capacity as needed to accomdate all the data.
292 if self._state == _BufferBlock.WRITABLE:
293 while (self.write_pointer+len(data)) > len(self.buffer_block):
294 new_buffer_block = bytearray(len(self.buffer_block) * 2)
295 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
296 self.buffer_block = new_buffer_block
297 self.buffer_view = memoryview(self.buffer_block)
298 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
299 self.write_pointer += len(data)
302 raise AssertionError("Buffer block is not writable")
305 def set_state(self, nextstate, loc=None):
306 if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
307 (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
308 self._state = nextstate
309 if self._state == _BufferBlock.COMMITTED:
311 self.buffer_view = None
312 self.buffer_block = None
314 raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate))
321 """The amount of data written to the buffer."""
322 return self.write_pointer
326 """The Keep locator for this buffer's contents."""
327 if self._locator is None:
328 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
332 def clone(self, new_blockid, owner):
333 if self._state == _BufferBlock.COMMITTED:
334 raise AssertionError("Can only duplicate a writable or pending buffer block")
335 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
336 bufferblock.append(self.buffer_view[0:self.size()])
342 self.buffer_block = None
343 self.buffer_view = None
346 class NoopLock(object):
350 def __exit__(self, exc_type, exc_value, traceback):
353 def acquire(self, blocking=False):
360 def must_be_writable(orig_func):
361 @functools.wraps(orig_func)
362 def must_be_writable_wrapper(self, *args, **kwargs):
363 if not self.writable():
364 raise IOError(errno.EROFS, "Collection must be writable.")
365 return orig_func(self, *args, **kwargs)
366 return must_be_writable_wrapper
369 class _BlockManager(object):
370 """BlockManager handles buffer blocks.
372 Also handles background block uploads, and background block prefetch for a
373 Collection of ArvadosFiles.
376 def __init__(self, keep):
377 """keep: KeepClient object to use"""
379 self._bufferblocks = {}
380 self._put_queue = None
381 self._put_errors = None
382 self._put_threads = None
383 self._prefetch_queue = None
384 self._prefetch_threads = None
385 self.lock = threading.Lock()
386 self.prefetch_enabled = True
387 self.num_put_threads = 2
388 self.num_get_threads = 2
391 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
392 """Allocate a new, empty bufferblock in WRITABLE state and return it.
395 optional block identifier, otherwise one will be automatically assigned
398 optional capacity, otherwise will use default capacity
401 ArvadosFile that owns this block
405 blockid = "bufferblock%i" % len(self._bufferblocks)
406 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
407 self._bufferblocks[bufferblock.blockid] = bufferblock
411 def dup_block(self, block, owner):
412 """Create a new bufferblock initialized with the content of an existing bufferblock.
415 the buffer block to copy.
418 ArvadosFile that owns the new block
421 new_blockid = "bufferblock%i" % len(self._bufferblocks)
422 bufferblock = block.clone(new_blockid, owner)
423 self._bufferblocks[bufferblock.blockid] = bufferblock
427 def is_bufferblock(self, locator):
428 return locator in self._bufferblocks
431 def stop_threads(self):
432 """Shut down and wait for background upload and download threads to finish."""
434 if self._put_threads is not None:
435 for t in self._put_threads:
436 self._put_queue.put(None)
437 for t in self._put_threads:
439 self._put_threads = None
440 self._put_queue = None
441 self._put_errors = None
443 if self._prefetch_threads is not None:
444 for t in self._prefetch_threads:
445 self._prefetch_queue.put(None)
446 for t in self._prefetch_threads:
448 self._prefetch_threads = None
449 self._prefetch_queue = None
451 def commit_bufferblock(self, block, wait):
452 """Initiate a background upload of a bufferblock.
455 The block object to upload
458 If `wait` is True, upload the block synchronously.
459 If `wait` is False, upload the block asynchronously. This will
460 return immediately unless if the upload queue is at capacity, in
461 which case it will wait on an upload queue slot.
465 def commit_bufferblock_worker(self):
466 """Background uploader thread."""
470 bufferblock = self._put_queue.get()
471 if bufferblock is None:
474 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
475 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
477 except Exception as e:
478 self._put_errors.put((bufferblock.locator(), e))
480 if self._put_queue is not None:
481 self._put_queue.task_done()
483 if block.state() != _BufferBlock.WRITABLE:
487 block.set_state(_BufferBlock.PENDING)
488 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
489 block.set_state(_BufferBlock.COMMITTED, loc)
492 if self._put_threads is None:
493 # Start uploader threads.
495 # If we don't limit the Queue size, the upload queue can quickly
496 # grow to take up gigabytes of RAM if the writing process is
497 # generating data more quickly than it can be send to the Keep
500 # With two upload threads and a queue size of 2, this means up to 4
501 # blocks pending. If they are full 64 MiB blocks, that means up to
502 # 256 MiB of internal buffering, which is the same size as the
503 # default download block cache in KeepClient.
504 self._put_queue = Queue.Queue(maxsize=2)
505 self._put_errors = Queue.Queue()
507 self._put_threads = []
508 for i in xrange(0, self.num_put_threads):
509 thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
510 self._put_threads.append(thread)
514 # Mark the block as PENDING so to disallow any more appends.
515 block.set_state(_BufferBlock.PENDING)
516 self._put_queue.put(block)
519 def get_bufferblock(self, locator):
520 return self._bufferblocks.get(locator)
523 def delete_bufferblock(self, locator):
524 bb = self._bufferblocks[locator]
526 del self._bufferblocks[locator]
528 def get_block_contents(self, locator, num_retries, cache_only=False):
531 First checks to see if the locator is a BufferBlock and return that, if
532 not, passes the request through to KeepClient.get().
536 if locator in self._bufferblocks:
537 bufferblock = self._bufferblocks[locator]
538 if bufferblock.state() != _BufferBlock.COMMITTED:
539 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
541 locator = bufferblock._locator
543 return self._keep.get_from_cache(locator)
545 return self._keep.get(locator, num_retries=num_retries)
547 def commit_all(self):
548 """Commit all outstanding buffer blocks.
550 Unlike commit_bufferblock(), this is a synchronous call, and will not
551 return until all buffer blocks are uploaded. Raises
552 KeepWriteError() if any blocks failed to upload.
556 items = self._bufferblocks.items()
559 if v.state() == _BufferBlock.WRITABLE:
563 if self._put_queue is not None:
564 self._put_queue.join()
566 if not self._put_errors.empty():
570 err.append(self._put_errors.get(False))
573 raise KeepWriteError("Error writing some blocks", err, label="block")
576 # flush again with wait=True to remove committed bufferblocks from
582 def block_prefetch(self, locator):
583 """Initiate a background download of a block.
585 This assumes that the underlying KeepClient implements a block cache,
586 so repeated requests for the same block will not result in repeated
587 downloads (unless the block is evicted from the cache.) This method
592 if not self.prefetch_enabled:
595 def block_prefetch_worker(self):
596 """The background downloader thread."""
599 b = self._prefetch_queue.get()
607 if locator in self._bufferblocks:
609 if self._prefetch_threads is None:
610 self._prefetch_queue = Queue.Queue()
611 self._prefetch_threads = []
612 for i in xrange(0, self.num_get_threads):
613 thread = threading.Thread(target=block_prefetch_worker, args=(self,))
614 self._prefetch_threads.append(thread)
617 self._prefetch_queue.put(locator)
620 class ArvadosFile(object):
621 """Represent a file in a Collection.
623 ArvadosFile manages the underlying representation of a file in Keep as a
624 sequence of segments spanning a set of blocks, and implements random
627 This object may be accessed from multiple threads.
631 def __init__(self, parent, name, stream=[], segments=[]):
633 ArvadosFile constructor.
636 a list of Range objects representing a block stream
639 a list of Range objects representing segments
643 self._modified = True
645 self.lock = parent.root_collection().lock
647 self._add_segment(stream, s.locator, s.range_size)
648 self._current_bblock = None
651 return self.parent.writable()
655 return copy.copy(self._segments)
658 def clone(self, new_parent, new_name):
659 """Make a copy of this file."""
660 cp = ArvadosFile(new_parent, new_name)
661 cp.replace_contents(self)
666 def replace_contents(self, other):
667 """Replace segments of this file with segments from another `ArvadosFile` object."""
671 for other_segment in other.segments():
672 new_loc = other_segment.locator
673 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
674 if other_segment.locator not in map_loc:
675 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
676 if bufferblock.state() != _BufferBlock.WRITABLE:
677 map_loc[other_segment.locator] = bufferblock.locator()
679 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
680 new_loc = map_loc[other_segment.locator]
682 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
684 self._modified = True
686 def __eq__(self, other):
689 if not isinstance(other, ArvadosFile):
692 othersegs = other.segments()
694 if len(self._segments) != len(othersegs):
696 for i in xrange(0, len(othersegs)):
697 seg1 = self._segments[i]
702 if self.parent._my_block_manager().is_bufferblock(loc1):
703 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
705 if other.parent._my_block_manager().is_bufferblock(loc2):
706 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
708 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
709 seg1.range_start != seg2.range_start or
710 seg1.range_size != seg2.range_size or
711 seg1.segment_offset != seg2.segment_offset):
716 def __ne__(self, other):
717 return not self.__eq__(other)
720 def set_unmodified(self):
721 """Clear the modified flag"""
722 self._modified = False
726 """Test the modified flag"""
727 return self._modified
731 def truncate(self, size):
732 """Shrink the size of the file.
734 If `size` is less than the size of the file, the file contents after
735 `size` will be discarded. If `size` is greater than the current size
736 of the file, an IOError will be raised.
739 if size < self.size():
741 for r in self._segments:
742 range_end = r.range_start+r.range_size
743 if r.range_start >= size:
744 # segment is past the trucate size, all done
746 elif size < range_end:
747 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
748 nr.segment_offset = r.segment_offset
754 self._segments = new_segs
755 self._modified = True
756 elif size > self.size():
757 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
759 def readfrom(self, offset, size, num_retries, exact=False):
760 """Read up to `size` bytes from the file starting at `offset`.
763 If False (default), return less data than requested if the read
764 crosses a block boundary and the next block isn't cached. If True,
765 only return less data than requested when hitting EOF.
769 if size == 0 or offset >= self.size():
771 prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
772 readsegs = locators_and_ranges(self._segments, offset, size)
775 self.parent._my_block_manager().block_prefetch(lr.locator)
779 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
781 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
786 def _repack_writes(self, num_retries):
787 """Test if the buffer block has more data than actual segments.
789 This happens when a buffered write over-writes a file range written in
790 a previous buffered write. Re-pack the buffer block for efficiency
791 and to avoid leaking information.
794 segs = self._segments
796 # Sum up the segments to get the total bytes of the file referencing
797 # into the buffer block.
798 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
799 write_total = sum([s.range_size for s in bufferblock_segs])
801 if write_total < self._current_bblock.size():
802 # There is more data in the buffer block than is actually accounted for by segments, so
803 # re-pack into a new buffer by copying over to a new buffer block.
804 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
805 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
806 for t in bufferblock_segs:
807 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
808 t.segment_offset = new_bb.size() - t.range_size
810 self._current_bblock = new_bb
814 def writeto(self, offset, data, num_retries):
815 """Write `data` to the file starting at `offset`.
817 This will update existing bytes and/or extend the size of the file as
824 if offset > self.size():
825 raise ArgumentError("Offset is past the end of the file")
827 if len(data) > config.KEEP_BLOCK_SIZE:
828 raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
830 self._modified = True
832 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
833 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
835 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
836 self._repack_writes(num_retries)
837 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
838 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, False)
839 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
841 self._current_bblock.append(data)
843 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
845 self.parent.notify(WRITE, self.parent, self.name, (self, self))
850 def flush(self, wait=True, num_retries=0):
851 """Flush bufferblocks to Keep."""
853 if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
854 self._repack_writes(num_retries)
855 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, wait)
858 for s in self._segments:
859 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
861 if bb.state() != _BufferBlock.COMMITTED:
862 _logger.error("bufferblock %s is not committed" % (s.locator))
864 to_delete.add(s.locator)
865 s.locator = bb.locator()
867 self.parent._my_block_manager().delete_bufferblock(s)
869 self.parent.notify(MOD, self.parent, self.name, (self, self))
873 def add_segment(self, blocks, pos, size):
874 """Add a segment to the end of the file.
876 `pos` and `offset` reference a section of the stream described by
877 `blocks` (a list of Range objects)
880 self._add_segment(blocks, pos, size)
882 def _add_segment(self, blocks, pos, size):
883 """Internal implementation of add_segment."""
884 self._modified = True
885 for lr in locators_and_ranges(blocks, pos, size):
886 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
887 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
888 self._segments.append(r)
892 """Get the file size."""
894 n = self._segments[-1]
895 return n.range_start + n.range_size
900 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
903 for segment in self.segments:
904 loc = segment.locator
905 if loc.startswith("bufferblock"):
906 loc = self._bufferblocks[loc].calculate_locator()
907 if portable_locators:
908 loc = KeepLocator(loc).stripped()
909 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
910 segment.segment_offset, segment.range_size))
911 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
917 def _reparent(self, newparent, newname):
918 self._modified = True
920 self.parent.remove(self.name)
921 self.parent = newparent
923 self.lock = self.parent.root_collection().lock
926 class ArvadosFileReader(ArvadosFileReaderBase):
927 """Wraps ArvadosFile in a file-like object supporting reading only.
929 Be aware that this class is NOT thread safe as there is no locking around
930 updating file pointer.
934 def __init__(self, arvadosfile, mode="r", num_retries=None):
935 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode, num_retries=num_retries)
936 self.arvadosfile = arvadosfile
939 return self.arvadosfile.size()
941 def stream_name(self):
942 return self.arvadosfile.parent.stream_name()
944 @_FileLikeObjectBase._before_close
946 def read(self, size=None, num_retries=None):
947 """Read up to `size` bytes from the file and return the result.
949 Starts at the current file position. If `size` is None, read the
950 entire remainder of the file.
954 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
957 self._filepos += len(rd)
958 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
961 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
962 self._filepos += len(data)
965 @_FileLikeObjectBase._before_close
967 def readfrom(self, offset, size, num_retries=None):
968 """Read up to `size` bytes from the stream, starting at the specified file offset.
970 This method does not change the file position.
972 return self.arvadosfile.readfrom(offset, size, num_retries)
978 class ArvadosFileWriter(ArvadosFileReader):
979 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
981 Be aware that this class is NOT thread safe as there is no locking around
982 updating file pointer.
986 def __init__(self, arvadosfile, mode, num_retries=None):
987 super(ArvadosFileWriter, self).__init__(arvadosfile, mode, num_retries=num_retries)
989 @_FileLikeObjectBase._before_close
991 def write(self, data, num_retries=None):
992 if self.mode[0] == "a":
993 self.arvadosfile.writeto(self.size(), data, num_retries)
995 self.arvadosfile.writeto(self._filepos, data, num_retries)
996 self._filepos += len(data)
999 @_FileLikeObjectBase._before_close
1001 def writelines(self, seq, num_retries=None):
1003 self.write(s, num_retries)
1005 @_FileLikeObjectBase._before_close
1006 def truncate(self, size=None):
1008 size = self._filepos
1009 self.arvadosfile.truncate(size)
1010 if self._filepos > self.size():
1011 self._filepos = self.size()
1013 @_FileLikeObjectBase._before_close
1015 self.arvadosfile.flush()
1020 super(ArvadosFileWriter, self).close()