14 from .errors import KeepWriteError, AssertionError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
22 _logger = logging.getLogger('arvados.arvfile')
25 """split(path) -> streamname, filename
27 Separate the stream name and file name in a /-separated stream path and
28 return a tuple (stream_name, file_name). If no stream name is available,
33 stream_name, file_name = path.rsplit('/', 1)
34 except ValueError: # No / in string
35 stream_name, file_name = '.', path
36 return stream_name, file_name
38 class _FileLikeObjectBase(object):
39 def __init__(self, name, mode):
45 def _before_close(orig_func):
46 @functools.wraps(orig_func)
47 def before_close_wrapper(self, *args, **kwargs):
49 raise ValueError("I/O operation on closed stream file")
50 return orig_func(self, *args, **kwargs)
51 return before_close_wrapper
56 def __exit__(self, exc_type, exc_value, traceback):
67 class ArvadosFileReaderBase(_FileLikeObjectBase):
68 def __init__(self, name, mode, num_retries=None):
69 super(ArvadosFileReaderBase, self).__init__(name, mode)
71 self.num_retries = num_retries
72 self._readline_cache = (None, None)
76 data = self.readline()
81 def decompressed_name(self):
82 return re.sub('\.(bz2|gz)$', '', self.name)
84 @_FileLikeObjectBase._before_close
85 def seek(self, pos, whence=os.SEEK_SET):
86 if whence == os.SEEK_CUR:
88 elif whence == os.SEEK_END:
90 self._filepos = min(max(pos, 0L), self.size())
95 @_FileLikeObjectBase._before_close
97 def readall(self, size=2**20, num_retries=None):
99 data = self.read(size, num_retries=num_retries)
104 @_FileLikeObjectBase._before_close
106 def readline(self, size=float('inf'), num_retries=None):
107 cache_pos, cache_data = self._readline_cache
108 if self.tell() == cache_pos:
112 data_size = len(data[-1])
113 while (data_size < size) and ('\n' not in data[-1]):
114 next_read = self.read(2 ** 20, num_retries=num_retries)
117 data.append(next_read)
118 data_size += len(next_read)
121 nextline_index = data.index('\n') + 1
123 nextline_index = len(data)
124 nextline_index = min(nextline_index, size)
125 self._readline_cache = (self.tell(), data[nextline_index:])
126 return data[:nextline_index]
128 @_FileLikeObjectBase._before_close
130 def decompress(self, decompress, size, num_retries=None):
131 for segment in self.readall(size, num_retries):
132 data = decompress(segment)
136 @_FileLikeObjectBase._before_close
138 def readall_decompressed(self, size=2**20, num_retries=None):
140 if self.name.endswith('.bz2'):
141 dc = bz2.BZ2Decompressor()
142 return self.decompress(dc.decompress, size,
143 num_retries=num_retries)
144 elif self.name.endswith('.gz'):
145 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
146 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
147 size, num_retries=num_retries)
149 return self.readall(size, num_retries=num_retries)
151 @_FileLikeObjectBase._before_close
153 def readlines(self, sizehint=float('inf'), num_retries=None):
156 for s in self.readall(num_retries=num_retries):
159 if data_size >= sizehint:
161 return ''.join(data).splitlines(True)
164 raise NotImplementedError()
166 def read(self, size, num_retries=None):
167 raise NotImplementedError()
169 def readfrom(self, start, size, num_retries=None):
170 raise NotImplementedError()
173 class StreamFileReader(ArvadosFileReaderBase):
174 class _NameAttribute(str):
175 # The Python file API provides a plain .name attribute.
176 # Older SDK provided a name() method.
177 # This class provides both, for maximum compatibility.
181 def __init__(self, stream, segments, name):
182 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
183 self._stream = stream
184 self.segments = segments
186 def stream_name(self):
187 return self._stream.name()
190 n = self.segments[-1]
191 return n.range_start + n.range_size
193 @_FileLikeObjectBase._before_close
195 def read(self, size, num_retries=None):
196 """Read up to 'size' bytes from the stream, starting at the current file position"""
201 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
203 lr = available_chunks[0]
204 data = self._stream.readfrom(lr.locator+lr.segment_offset,
206 num_retries=num_retries)
208 self._filepos += len(data)
211 @_FileLikeObjectBase._before_close
213 def readfrom(self, start, size, num_retries=None):
214 """Read up to 'size' bytes from the stream, starting at 'start'"""
219 for lr in locators_and_ranges(self.segments, start, size):
220 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
221 num_retries=num_retries))
224 def as_manifest(self):
226 for r in self.segments:
227 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
228 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
231 def synchronized(orig_func):
232 @functools.wraps(orig_func)
233 def synchronized_wrapper(self, *args, **kwargs):
235 return orig_func(self, *args, **kwargs)
236 return synchronized_wrapper
238 class _BufferBlock(object):
239 """A stand-in for a Keep block that is in the process of being written.
241 Writers can append to it, get the size, and compute the Keep locator.
242 There are three valid states:
248 Block is in the process of being uploaded to Keep, append is an error.
251 The block has been written to Keep, its internal buffer has been
252 released, fetching the block will fetch it via keep client (since we
253 discarded the internal copy), and identifiers referring to the BufferBlock
254 can be replaced with the block locator.
262 def __init__(self, blockid, starting_capacity, owner):
265 the identifier for this block
268 the initial buffer capacity
271 ArvadosFile that owns this block
274 self.blockid = blockid
275 self.buffer_block = bytearray(starting_capacity)
276 self.buffer_view = memoryview(self.buffer_block)
277 self.write_pointer = 0
278 self._state = _BufferBlock.WRITABLE
281 self.lock = threading.Lock()
284 def append(self, data):
285 """Append some data to the buffer.
287 Only valid if the block is in WRITABLE state. Implements an expanding
288 buffer, doubling capacity as needed to accomdate all the data.
291 if self._state == _BufferBlock.WRITABLE:
292 while (self.write_pointer+len(data)) > len(self.buffer_block):
293 new_buffer_block = bytearray(len(self.buffer_block) * 2)
294 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
295 self.buffer_block = new_buffer_block
296 self.buffer_view = memoryview(self.buffer_block)
297 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
298 self.write_pointer += len(data)
301 raise AssertionError("Buffer block is not writable")
304 def set_state(self, nextstate, loc=None):
305 if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
306 (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
307 self._state = nextstate
308 if self._state == _BufferBlock.COMMITTED:
310 self.buffer_view = None
311 self.buffer_block = None
313 raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate))
320 """The amount of data written to the buffer."""
321 return self.write_pointer
325 """The Keep locator for this buffer's contents."""
326 if self._locator is None:
327 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
331 def clone(self, new_blockid, owner):
332 if self._state == _BufferBlock.COMMITTED:
333 raise AssertionError("Can only duplicate a writable or pending buffer block")
334 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
335 bufferblock.append(self.buffer_view[0:self.size()])
341 self.buffer_block = None
342 self.buffer_view = None
345 class NoopLock(object):
349 def __exit__(self, exc_type, exc_value, traceback):
352 def acquire(self, blocking=False):
359 def must_be_writable(orig_func):
360 @functools.wraps(orig_func)
361 def must_be_writable_wrapper(self, *args, **kwargs):
362 if not self.writable():
363 raise IOError(errno.EROFS, "Collection must be writable.")
364 return orig_func(self, *args, **kwargs)
365 return must_be_writable_wrapper
368 class _BlockManager(object):
369 """BlockManager handles buffer blocks.
371 Also handles background block uploads, and background block prefetch for a
372 Collection of ArvadosFiles.
375 def __init__(self, keep):
376 """keep: KeepClient object to use"""
378 self._bufferblocks = {}
379 self._put_queue = None
380 self._put_errors = None
381 self._put_threads = None
382 self._prefetch_queue = None
383 self._prefetch_threads = None
384 self.lock = threading.Lock()
385 self.prefetch_enabled = True
386 self.num_put_threads = 2
387 self.num_get_threads = 2
390 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
391 """Allocate a new, empty bufferblock in WRITABLE state and return it.
394 optional block identifier, otherwise one will be automatically assigned
397 optional capacity, otherwise will use default capacity
400 ArvadosFile that owns this block
404 blockid = "bufferblock%i" % len(self._bufferblocks)
405 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
406 self._bufferblocks[bufferblock.blockid] = bufferblock
410 def dup_block(self, block, owner):
411 """Create a new bufferblock initialized with the content of an existing bufferblock.
414 the buffer block to copy.
417 ArvadosFile that owns the new block
420 new_blockid = "bufferblock%i" % len(self._bufferblocks)
421 bufferblock = block.clone(new_blockid, owner)
422 self._bufferblocks[bufferblock.blockid] = bufferblock
426 def is_bufferblock(self, locator):
427 return locator in self._bufferblocks
430 def stop_threads(self):
431 """Shut down and wait for background upload and download threads to finish."""
433 if self._put_threads is not None:
434 for t in self._put_threads:
435 self._put_queue.put(None)
436 for t in self._put_threads:
438 self._put_threads = None
439 self._put_queue = None
440 self._put_errors = None
442 if self._prefetch_threads is not None:
443 for t in self._prefetch_threads:
444 self._prefetch_queue.put(None)
445 for t in self._prefetch_threads:
447 self._prefetch_threads = None
448 self._prefetch_queue = None
450 def commit_bufferblock(self, block, wait):
451 """Initiate a background upload of a bufferblock.
454 The block object to upload
457 If `wait` is True, upload the block synchronously.
458 If `wait` is False, upload the block asynchronously. This will
459 return immediately unless if the upload queue is at capacity, in
460 which case it will wait on an upload queue slot.
464 def commit_bufferblock_worker(self):
465 """Background uploader thread."""
469 bufferblock = self._put_queue.get()
470 if bufferblock is None:
473 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
474 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
476 except Exception as e:
477 self._put_errors.put((bufferblock.locator(), e))
479 if self._put_queue is not None:
480 self._put_queue.task_done()
482 if block.state() != _BufferBlock.WRITABLE:
486 block.set_state(_BufferBlock.PENDING)
487 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
488 block.set_state(_BufferBlock.COMMITTED, loc)
491 if self._put_threads is None:
492 # Start uploader threads.
494 # If we don't limit the Queue size, the upload queue can quickly
495 # grow to take up gigabytes of RAM if the writing process is
496 # generating data more quickly than it can be send to the Keep
499 # With two upload threads and a queue size of 2, this means up to 4
500 # blocks pending. If they are full 64 MiB blocks, that means up to
501 # 256 MiB of internal buffering, which is the same size as the
502 # default download block cache in KeepClient.
503 self._put_queue = Queue.Queue(maxsize=2)
504 self._put_errors = Queue.Queue()
506 self._put_threads = []
507 for i in xrange(0, self.num_put_threads):
508 thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
509 self._put_threads.append(thread)
513 # Mark the block as PENDING so to disallow any more appends.
514 block.set_state(_BufferBlock.PENDING)
515 self._put_queue.put(block)
518 def get_bufferblock(self, locator):
519 return self._bufferblocks.get(locator)
522 def delete_bufferblock(self, locator):
523 bb = self._bufferblocks[locator]
525 del self._bufferblocks[locator]
527 def get_block_contents(self, locator, num_retries, cache_only=False):
530 First checks to see if the locator is a BufferBlock and return that, if
531 not, passes the request through to KeepClient.get().
535 if locator in self._bufferblocks:
536 bufferblock = self._bufferblocks[locator]
537 if bufferblock.state() != _BufferBlock.COMMITTED:
538 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
540 locator = bufferblock._locator
542 return self._keep.get_from_cache(locator)
544 return self._keep.get(locator, num_retries=num_retries)
546 def commit_all(self):
547 """Commit all outstanding buffer blocks.
549 Unlike commit_bufferblock(), this is a synchronous call, and will not
550 return until all buffer blocks are uploaded. Raises
551 KeepWriteError() if any blocks failed to upload.
555 items = self._bufferblocks.items()
558 if v.state() == _BufferBlock.WRITABLE:
562 if self._put_queue is not None:
563 self._put_queue.join()
565 if not self._put_errors.empty():
569 err.append(self._put_errors.get(False))
572 raise KeepWriteError("Error writing some blocks", err, label="block")
575 # flush again with wait=True to remove committed bufferblocks from
580 def block_prefetch(self, locator):
581 """Initiate a background download of a block.
583 This assumes that the underlying KeepClient implements a block cache,
584 so repeated requests for the same block will not result in repeated
585 downloads (unless the block is evicted from the cache.) This method
590 if not self.prefetch_enabled:
593 def block_prefetch_worker(self):
594 """The background downloader thread."""
597 b = self._prefetch_queue.get()
605 if locator in self._bufferblocks:
607 if self._prefetch_threads is None:
608 self._prefetch_queue = Queue.Queue()
609 self._prefetch_threads = []
610 for i in xrange(0, self.num_get_threads):
611 thread = threading.Thread(target=block_prefetch_worker, args=(self,))
612 self._prefetch_threads.append(thread)
615 self._prefetch_queue.put(locator)
618 class ArvadosFile(object):
619 """Represent a file in a Collection.
621 ArvadosFile manages the underlying representation of a file in Keep as a
622 sequence of segments spanning a set of blocks, and implements random
625 This object may be accessed from multiple threads.
629 def __init__(self, parent, name, stream=[], segments=[]):
631 ArvadosFile constructor.
634 a list of Range objects representing a block stream
637 a list of Range objects representing segments
641 self._modified = True
643 self.lock = parent.root_collection().lock
645 self._add_segment(stream, s.locator, s.range_size)
646 self._current_bblock = None
649 return self.parent.writable()
653 return copy.copy(self._segments)
656 def clone(self, new_parent, new_name):
657 """Make a copy of this file."""
658 cp = ArvadosFile(new_parent, new_name)
659 cp.replace_contents(self)
664 def replace_contents(self, other):
665 """Replace segments of this file with segments from another `ArvadosFile` object."""
669 for other_segment in other.segments():
670 new_loc = other_segment.locator
671 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
672 if other_segment.locator not in map_loc:
673 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
674 if bufferblock.state() != _BufferBlock.WRITABLE:
675 map_loc[other_segment.locator] = bufferblock.locator()
677 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
678 new_loc = map_loc[other_segment.locator]
680 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
682 self._modified = True
684 def __eq__(self, other):
687 if not isinstance(other, ArvadosFile):
690 othersegs = other.segments()
692 if len(self._segments) != len(othersegs):
694 for i in xrange(0, len(othersegs)):
695 seg1 = self._segments[i]
700 if self.parent._my_block_manager().is_bufferblock(loc1):
701 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
703 if other.parent._my_block_manager().is_bufferblock(loc2):
704 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
706 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
707 seg1.range_start != seg2.range_start or
708 seg1.range_size != seg2.range_size or
709 seg1.segment_offset != seg2.segment_offset):
714 def __ne__(self, other):
715 return not self.__eq__(other)
718 def set_unmodified(self):
719 """Clear the modified flag"""
720 self._modified = False
724 """Test the modified flag"""
725 return self._modified
729 def truncate(self, size):
730 """Shrink the size of the file.
732 If `size` is less than the size of the file, the file contents after
733 `size` will be discarded. If `size` is greater than the current size
734 of the file, an IOError will be raised.
737 if size < self.size():
739 for r in self._segments:
740 range_end = r.range_start+r.range_size
741 if r.range_start >= size:
742 # segment is past the trucate size, all done
744 elif size < range_end:
745 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
746 nr.segment_offset = r.segment_offset
752 self._segments = new_segs
753 self._modified = True
754 elif size > self.size():
755 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
757 def readfrom(self, offset, size, num_retries, exact=False):
758 """Read up to `size` bytes from the file starting at `offset`.
761 If False (default), return less data than requested if the read
762 crosses a block boundary and the next block isn't cached. If True,
763 only return less data than requested when hitting EOF.
767 if size == 0 or offset >= self.size():
769 prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
770 readsegs = locators_and_ranges(self._segments, offset, size)
773 self.parent._my_block_manager().block_prefetch(lr.locator)
777 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
779 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
784 def _repack_writes(self, num_retries):
785 """Test if the buffer block has more data than actual segments.
787 This happens when a buffered write over-writes a file range written in
788 a previous buffered write. Re-pack the buffer block for efficiency
789 and to avoid leaking information.
792 segs = self._segments
794 # Sum up the segments to get the total bytes of the file referencing
795 # into the buffer block.
796 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
797 write_total = sum([s.range_size for s in bufferblock_segs])
799 if write_total < self._current_bblock.size():
800 # There is more data in the buffer block than is actually accounted for by segments, so
801 # re-pack into a new buffer by copying over to a new buffer block.
802 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
803 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
804 for t in bufferblock_segs:
805 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
806 t.segment_offset = new_bb.size() - t.range_size
808 self._current_bblock = new_bb
812 def writeto(self, offset, data, num_retries):
813 """Write `data` to the file starting at `offset`.
815 This will update existing bytes and/or extend the size of the file as
822 if offset > self.size():
823 raise ArgumentError("Offset is past the end of the file")
825 if len(data) > config.KEEP_BLOCK_SIZE:
826 raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
828 self._modified = True
830 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
831 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
833 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
834 self._repack_writes(num_retries)
835 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
836 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, False)
837 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
839 self._current_bblock.append(data)
841 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
843 self.parent.notify(MOD, self.parent, self.name, (self, self))
848 def flush(self, wait=True, num_retries=0):
849 """Flush bufferblocks to Keep."""
851 if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
852 self._repack_writes(num_retries)
853 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, wait)
856 for s in self._segments:
857 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
859 if bb.state() != _BufferBlock.COMMITTED:
860 _logger.error("bufferblock %s is not committed" % (s.locator))
862 to_delete.add(s.locator)
863 s.locator = bb.locator()
865 self.parent._my_block_manager().delete_bufferblock(s)
867 self.parent.notify(MOD, self.parent, self.name, (self, self))
871 def add_segment(self, blocks, pos, size):
872 """Add a segment to the end of the file.
874 `pos` and `offset` reference a section of the stream described by
875 `blocks` (a list of Range objects)
878 self._add_segment(blocks, pos, size)
880 def _add_segment(self, blocks, pos, size):
881 """Internal implementation of add_segment."""
882 self._modified = True
883 for lr in locators_and_ranges(blocks, pos, size):
884 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
885 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
886 self._segments.append(r)
890 """Get the file size."""
892 n = self._segments[-1]
893 return n.range_start + n.range_size
898 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
901 for segment in self.segments:
902 loc = segment.locator
903 if loc.startswith("bufferblock"):
904 loc = self._bufferblocks[loc].calculate_locator()
905 if portable_locators:
906 loc = KeepLocator(loc).stripped()
907 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
908 segment.segment_offset, segment.range_size))
909 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
915 def _reparent(self, newparent, newname):
916 self._modified = True
918 self.parent.remove(self.name)
919 self.parent = newparent
921 self.lock = self.parent.root_collection().lock
924 class ArvadosFileReader(ArvadosFileReaderBase):
925 """Wraps ArvadosFile in a file-like object supporting reading only.
927 Be aware that this class is NOT thread safe as there is no locking around
928 updating file pointer.
932 def __init__(self, arvadosfile, mode="r", num_retries=None):
933 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode, num_retries=num_retries)
934 self.arvadosfile = arvadosfile
937 return self.arvadosfile.size()
939 def stream_name(self):
940 return self.arvadosfile.parent.stream_name()
942 @_FileLikeObjectBase._before_close
944 def read(self, size=None, num_retries=None):
945 """Read up to `size` bytes from the file and return the result.
947 Starts at the current file position. If `size` is None, read the
948 entire remainder of the file.
952 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
955 self._filepos += len(rd)
956 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
959 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
960 self._filepos += len(data)
963 @_FileLikeObjectBase._before_close
965 def readfrom(self, offset, size, num_retries=None):
966 """Read up to `size` bytes from the stream, starting at the specified file offset.
968 This method does not change the file position.
970 return self.arvadosfile.readfrom(offset, size, num_retries)
976 class ArvadosFileWriter(ArvadosFileReader):
977 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
979 Be aware that this class is NOT thread safe as there is no locking around
980 updating file pointer.
984 def __init__(self, arvadosfile, mode, num_retries=None):
985 super(ArvadosFileWriter, self).__init__(arvadosfile, mode, num_retries=num_retries)
987 @_FileLikeObjectBase._before_close
989 def write(self, data, num_retries=None):
990 if self.mode[0] == "a":
991 self.arvadosfile.writeto(self.size(), data, num_retries)
993 self.arvadosfile.writeto(self._filepos, data, num_retries)
994 self._filepos += len(data)
997 @_FileLikeObjectBase._before_close
999 def writelines(self, seq, num_retries=None):
1001 self.write(s, num_retries)
1003 @_FileLikeObjectBase._before_close
1004 def truncate(self, size=None):
1006 size = self._filepos
1007 self.arvadosfile.truncate(size)
1008 if self._filepos > self.size():
1009 self._filepos = self.size()
1011 @_FileLikeObjectBase._before_close
1013 self.arvadosfile.flush()
1018 super(ArvadosFileWriter, self).close()