14 from .errors import KeepWriteError, AssertionError, ArgumentError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
23 _logger = logging.getLogger('arvados.arvfile')
26 """split(path) -> streamname, filename
28 Separate the stream name and file name in a /-separated stream path and
29 return a tuple (stream_name, file_name). If no stream name is available,
34 stream_name, file_name = path.rsplit('/', 1)
35 except ValueError: # No / in string
36 stream_name, file_name = '.', path
37 return stream_name, file_name
39 class _FileLikeObjectBase(object):
40 def __init__(self, name, mode):
46 def _before_close(orig_func):
47 @functools.wraps(orig_func)
48 def before_close_wrapper(self, *args, **kwargs):
50 raise ValueError("I/O operation on closed stream file")
51 return orig_func(self, *args, **kwargs)
52 return before_close_wrapper
57 def __exit__(self, exc_type, exc_value, traceback):
68 class ArvadosFileReaderBase(_FileLikeObjectBase):
69 def __init__(self, name, mode, num_retries=None):
70 super(ArvadosFileReaderBase, self).__init__(name, mode)
72 self.num_retries = num_retries
73 self._readline_cache = (None, None)
77 data = self.readline()
82 def decompressed_name(self):
83 return re.sub('\.(bz2|gz)$', '', self.name)
85 @_FileLikeObjectBase._before_close
86 def seek(self, pos, whence=os.SEEK_SET):
87 if whence == os.SEEK_CUR:
89 elif whence == os.SEEK_END:
91 self._filepos = min(max(pos, 0L), self.size())
96 @_FileLikeObjectBase._before_close
98 def readall(self, size=2**20, num_retries=None):
100 data = self.read(size, num_retries=num_retries)
105 @_FileLikeObjectBase._before_close
107 def readline(self, size=float('inf'), num_retries=None):
108 cache_pos, cache_data = self._readline_cache
109 if self.tell() == cache_pos:
113 data_size = len(data[-1])
114 while (data_size < size) and ('\n' not in data[-1]):
115 next_read = self.read(2 ** 20, num_retries=num_retries)
118 data.append(next_read)
119 data_size += len(next_read)
122 nextline_index = data.index('\n') + 1
124 nextline_index = len(data)
125 nextline_index = min(nextline_index, size)
126 self._readline_cache = (self.tell(), data[nextline_index:])
127 return data[:nextline_index]
129 @_FileLikeObjectBase._before_close
131 def decompress(self, decompress, size, num_retries=None):
132 for segment in self.readall(size, num_retries=num_retries):
133 data = decompress(segment)
137 @_FileLikeObjectBase._before_close
139 def readall_decompressed(self, size=2**20, num_retries=None):
141 if self.name.endswith('.bz2'):
142 dc = bz2.BZ2Decompressor()
143 return self.decompress(dc.decompress, size,
144 num_retries=num_retries)
145 elif self.name.endswith('.gz'):
146 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
147 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
148 size, num_retries=num_retries)
150 return self.readall(size, num_retries=num_retries)
152 @_FileLikeObjectBase._before_close
154 def readlines(self, sizehint=float('inf'), num_retries=None):
157 for s in self.readall(num_retries=num_retries):
160 if data_size >= sizehint:
162 return ''.join(data).splitlines(True)
165 raise NotImplementedError()
167 def read(self, size, num_retries=None):
168 raise NotImplementedError()
170 def readfrom(self, start, size, num_retries=None):
171 raise NotImplementedError()
174 class StreamFileReader(ArvadosFileReaderBase):
175 class _NameAttribute(str):
176 # The Python file API provides a plain .name attribute.
177 # Older SDK provided a name() method.
178 # This class provides both, for maximum compatibility.
182 def __init__(self, stream, segments, name):
183 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
184 self._stream = stream
185 self.segments = segments
187 def stream_name(self):
188 return self._stream.name()
191 n = self.segments[-1]
192 return n.range_start + n.range_size
194 @_FileLikeObjectBase._before_close
196 def read(self, size, num_retries=None):
197 """Read up to 'size' bytes from the stream, starting at the current file position"""
202 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
204 lr = available_chunks[0]
205 data = self._stream.readfrom(lr.locator+lr.segment_offset,
207 num_retries=num_retries)
209 self._filepos += len(data)
212 @_FileLikeObjectBase._before_close
214 def readfrom(self, start, size, num_retries=None):
215 """Read up to 'size' bytes from the stream, starting at 'start'"""
220 for lr in locators_and_ranges(self.segments, start, size):
221 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
222 num_retries=num_retries))
225 def as_manifest(self):
227 for r in self.segments:
228 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
229 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
232 def synchronized(orig_func):
233 @functools.wraps(orig_func)
234 def synchronized_wrapper(self, *args, **kwargs):
236 return orig_func(self, *args, **kwargs)
237 return synchronized_wrapper
240 class StateChangeError(Exception):
241 def __init__(self, message, state, nextstate):
242 super(StateChangeError, self).__init__(message)
244 self.nextstate = nextstate
246 class _BufferBlock(object):
247 """A stand-in for a Keep block that is in the process of being written.
249 Writers can append to it, get the size, and compute the Keep locator.
250 There are three valid states:
256 Block is in the process of being uploaded to Keep, append is an error.
259 The block has been written to Keep, its internal buffer has been
260 released, fetching the block will fetch it via keep client (since we
261 discarded the internal copy), and identifiers referring to the BufferBlock
262 can be replaced with the block locator.
271 def __init__(self, blockid, starting_capacity, owner):
274 the identifier for this block
277 the initial buffer capacity
280 ArvadosFile that owns this block
283 self.blockid = blockid
284 self.buffer_block = bytearray(starting_capacity)
285 self.buffer_view = memoryview(self.buffer_block)
286 self.write_pointer = 0
287 self._state = _BufferBlock.WRITABLE
290 self.lock = threading.Lock()
291 self.wait_for_commit = threading.Event()
295 def append(self, data):
296 """Append some data to the buffer.
298 Only valid if the block is in WRITABLE state. Implements an expanding
299 buffer, doubling capacity as needed to accomdate all the data.
302 if self._state == _BufferBlock.WRITABLE:
303 while (self.write_pointer+len(data)) > len(self.buffer_block):
304 new_buffer_block = bytearray(len(self.buffer_block) * 2)
305 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
306 self.buffer_block = new_buffer_block
307 self.buffer_view = memoryview(self.buffer_block)
308 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
309 self.write_pointer += len(data)
312 raise AssertionError("Buffer block is not writable")
314 STATE_TRANSITIONS = frozenset([
316 (PENDING, COMMITTED),
321 def set_state(self, nextstate, val=None):
322 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
323 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
324 self._state = nextstate
326 if self._state == _BufferBlock.PENDING:
327 self.wait_for_commit.clear()
329 if self._state == _BufferBlock.COMMITTED:
331 self.buffer_view = None
332 self.buffer_block = None
333 self.wait_for_commit.set()
335 if self._state == _BufferBlock.ERROR:
337 self.wait_for_commit.set()
344 """The amount of data written to the buffer."""
345 return self.write_pointer
349 """The Keep locator for this buffer's contents."""
350 if self._locator is None:
351 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
355 def clone(self, new_blockid, owner):
356 if self._state == _BufferBlock.COMMITTED:
357 raise AssertionError("Cannot duplicate committed buffer block")
358 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
359 bufferblock.append(self.buffer_view[0:self.size()])
365 self.buffer_block = None
366 self.buffer_view = None
369 class NoopLock(object):
373 def __exit__(self, exc_type, exc_value, traceback):
376 def acquire(self, blocking=False):
383 def must_be_writable(orig_func):
384 @functools.wraps(orig_func)
385 def must_be_writable_wrapper(self, *args, **kwargs):
386 if not self.writable():
387 raise IOError(errno.EROFS, "Collection is read-only.")
388 return orig_func(self, *args, **kwargs)
389 return must_be_writable_wrapper
392 class _BlockManager(object):
393 """BlockManager handles buffer blocks.
395 Also handles background block uploads, and background block prefetch for a
396 Collection of ArvadosFiles.
400 DEFAULT_PUT_THREADS = 2
401 DEFAULT_GET_THREADS = 2
403 def __init__(self, keep):
404 """keep: KeepClient object to use"""
406 self._bufferblocks = {}
407 self._put_queue = None
408 self._put_threads = None
409 self._prefetch_queue = None
410 self._prefetch_threads = None
411 self.lock = threading.Lock()
412 self.prefetch_enabled = True
413 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
414 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
417 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
418 """Allocate a new, empty bufferblock in WRITABLE state and return it.
421 optional block identifier, otherwise one will be automatically assigned
424 optional capacity, otherwise will use default capacity
427 ArvadosFile that owns this block
431 blockid = "bufferblock%i" % len(self._bufferblocks)
432 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
433 self._bufferblocks[bufferblock.blockid] = bufferblock
437 def dup_block(self, block, owner):
438 """Create a new bufferblock initialized with the content of an existing bufferblock.
441 the buffer block to copy.
444 ArvadosFile that owns the new block
447 new_blockid = "bufferblock%i" % len(self._bufferblocks)
448 bufferblock = block.clone(new_blockid, owner)
449 self._bufferblocks[bufferblock.blockid] = bufferblock
453 def is_bufferblock(self, locator):
454 return locator in self._bufferblocks
456 def _commit_bufferblock_worker(self):
457 """Background uploader thread."""
461 bufferblock = self._put_queue.get()
462 if bufferblock is None:
465 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
466 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
468 except Exception as e:
469 bufferblock.set_state(_BufferBlock.ERROR, e)
471 if self._put_queue is not None:
472 self._put_queue.task_done()
475 def start_put_threads(self):
476 if self._put_threads is None:
477 # Start uploader threads.
479 # If we don't limit the Queue size, the upload queue can quickly
480 # grow to take up gigabytes of RAM if the writing process is
481 # generating data more quickly than it can be send to the Keep
484 # With two upload threads and a queue size of 2, this means up to 4
485 # blocks pending. If they are full 64 MiB blocks, that means up to
486 # 256 MiB of internal buffering, which is the same size as the
487 # default download block cache in KeepClient.
488 self._put_queue = Queue.Queue(maxsize=2)
490 self._put_threads = []
491 for i in xrange(0, self.num_put_threads):
492 thread = threading.Thread(target=self._commit_bufferblock_worker)
493 self._put_threads.append(thread)
497 def _block_prefetch_worker(self):
498 """The background downloader thread."""
501 b = self._prefetch_queue.get()
509 def start_get_threads(self):
510 if self._prefetch_threads is None:
511 self._prefetch_queue = Queue.Queue()
512 self._prefetch_threads = []
513 for i in xrange(0, self.num_get_threads):
514 thread = threading.Thread(target=self._block_prefetch_worker)
515 self._prefetch_threads.append(thread)
521 def stop_threads(self):
522 """Shut down and wait for background upload and download threads to finish."""
524 if self._put_threads is not None:
525 for t in self._put_threads:
526 self._put_queue.put(None)
527 for t in self._put_threads:
529 self._put_threads = None
530 self._put_queue = None
532 if self._prefetch_threads is not None:
533 for t in self._prefetch_threads:
534 self._prefetch_queue.put(None)
535 for t in self._prefetch_threads:
537 self._prefetch_threads = None
538 self._prefetch_queue = None
543 def __exit__(self, exc_type, exc_value, traceback):
546 def commit_bufferblock(self, block, sync):
547 """Initiate a background upload of a bufferblock.
550 The block object to upload
553 If `sync` is True, upload the block synchronously.
554 If `sync` is False, upload the block asynchronously. This will
555 return immediately unless the upload queue is at capacity, in
556 which case it will wait on an upload queue slot.
561 # Mark the block as PENDING so to disallow any more appends.
562 block.set_state(_BufferBlock.PENDING)
563 except StateChangeError as e:
564 if e.state == _BufferBlock.PENDING:
566 block.wait_for_commit.wait()
569 if block.state() == _BufferBlock.COMMITTED:
571 elif block.state() == _BufferBlock.ERROR:
578 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
579 block.set_state(_BufferBlock.COMMITTED, loc)
580 except Exception as e:
581 block.set_state(_BufferBlock.ERROR, e)
584 self.start_put_threads()
585 self._put_queue.put(block)
588 def get_bufferblock(self, locator):
589 return self._bufferblocks.get(locator)
592 def delete_bufferblock(self, locator):
593 bb = self._bufferblocks[locator]
595 del self._bufferblocks[locator]
597 def get_block_contents(self, locator, num_retries, cache_only=False):
600 First checks to see if the locator is a BufferBlock and return that, if
601 not, passes the request through to KeepClient.get().
605 if locator in self._bufferblocks:
606 bufferblock = self._bufferblocks[locator]
607 if bufferblock.state() != _BufferBlock.COMMITTED:
608 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
610 locator = bufferblock._locator
612 return self._keep.get_from_cache(locator)
614 return self._keep.get(locator, num_retries=num_retries)
616 def commit_all(self):
617 """Commit all outstanding buffer blocks.
619 This is a synchronous call, and will not return until all buffer blocks
620 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
624 items = self._bufferblocks.items()
627 if v.state() != _BufferBlock.COMMITTED:
628 v.owner.flush(sync=False)
631 if self._put_queue is not None:
632 self._put_queue.join()
636 if v.state() == _BufferBlock.ERROR:
637 err.append((v.locator(), v.error))
639 raise KeepWriteError("Error writing some blocks", err, label="block")
642 # flush again with sync=True to remove committed bufferblocks from
645 v.owner.flush(sync=True)
647 def block_prefetch(self, locator):
648 """Initiate a background download of a block.
650 This assumes that the underlying KeepClient implements a block cache,
651 so repeated requests for the same block will not result in repeated
652 downloads (unless the block is evicted from the cache.) This method
657 if not self.prefetch_enabled:
660 if self._keep.get_from_cache(locator) is not None:
664 if locator in self._bufferblocks:
667 self.start_get_threads()
668 self._prefetch_queue.put(locator)
671 class ArvadosFile(object):
672 """Represent a file in a Collection.
674 ArvadosFile manages the underlying representation of a file in Keep as a
675 sequence of segments spanning a set of blocks, and implements random
678 This object may be accessed from multiple threads.
682 def __init__(self, parent, name, stream=[], segments=[]):
684 ArvadosFile constructor.
687 a list of Range objects representing a block stream
690 a list of Range objects representing segments
694 self._committed = False
696 self.lock = parent.root_collection().lock
698 self._add_segment(stream, s.locator, s.range_size)
699 self._current_bblock = None
702 return self.parent.writable()
706 return copy.copy(self._segments)
709 def clone(self, new_parent, new_name):
710 """Make a copy of this file."""
711 cp = ArvadosFile(new_parent, new_name)
712 cp.replace_contents(self)
717 def replace_contents(self, other):
718 """Replace segments of this file with segments from another `ArvadosFile` object."""
722 for other_segment in other.segments():
723 new_loc = other_segment.locator
724 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
725 if other_segment.locator not in map_loc:
726 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
727 if bufferblock.state() != _BufferBlock.WRITABLE:
728 map_loc[other_segment.locator] = bufferblock.locator()
730 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
731 new_loc = map_loc[other_segment.locator]
733 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
735 self._committed = False
737 def __eq__(self, other):
740 if not isinstance(other, ArvadosFile):
743 othersegs = other.segments()
745 if len(self._segments) != len(othersegs):
747 for i in xrange(0, len(othersegs)):
748 seg1 = self._segments[i]
753 if self.parent._my_block_manager().is_bufferblock(loc1):
754 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
756 if other.parent._my_block_manager().is_bufferblock(loc2):
757 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
759 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
760 seg1.range_start != seg2.range_start or
761 seg1.range_size != seg2.range_size or
762 seg1.segment_offset != seg2.segment_offset):
767 def __ne__(self, other):
768 return not self.__eq__(other)
771 def set_committed(self):
772 """Set committed flag to False"""
773 self._committed = True
777 """Get whether this is committed or not."""
778 return self._committed
782 def truncate(self, size):
783 """Shrink the size of the file.
785 If `size` is less than the size of the file, the file contents after
786 `size` will be discarded. If `size` is greater than the current size
787 of the file, an IOError will be raised.
790 if size < self.size():
792 for r in self._segments:
793 range_end = r.range_start+r.range_size
794 if r.range_start >= size:
795 # segment is past the trucate size, all done
797 elif size < range_end:
798 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
799 nr.segment_offset = r.segment_offset
805 self._segments = new_segs
806 self._committed = False
807 elif size > self.size():
808 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
810 def readfrom(self, offset, size, num_retries, exact=False):
811 """Read up to `size` bytes from the file starting at `offset`.
814 If False (default), return less data than requested if the read
815 crosses a block boundary and the next block isn't cached. If True,
816 only return less data than requested when hitting EOF.
820 if size == 0 or offset >= self.size():
822 readsegs = locators_and_ranges(self._segments, offset, size)
823 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
828 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
830 blockview = memoryview(block)
831 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
837 if lr.locator not in locs:
838 self.parent._my_block_manager().block_prefetch(lr.locator)
843 def _repack_writes(self, num_retries):
844 """Test if the buffer block has more data than actual segments.
846 This happens when a buffered write over-writes a file range written in
847 a previous buffered write. Re-pack the buffer block for efficiency
848 and to avoid leaking information.
851 segs = self._segments
853 # Sum up the segments to get the total bytes of the file referencing
854 # into the buffer block.
855 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
856 write_total = sum([s.range_size for s in bufferblock_segs])
858 if write_total < self._current_bblock.size():
859 # There is more data in the buffer block than is actually accounted for by segments, so
860 # re-pack into a new buffer by copying over to a new buffer block.
861 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
862 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
863 for t in bufferblock_segs:
864 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
865 t.segment_offset = new_bb.size() - t.range_size
867 self._current_bblock = new_bb
871 def writeto(self, offset, data, num_retries):
872 """Write `data` to the file starting at `offset`.
874 This will update existing bytes and/or extend the size of the file as
881 if offset > self.size():
882 raise ArgumentError("Offset is past the end of the file")
884 if len(data) > config.KEEP_BLOCK_SIZE:
885 # Chunk it up into smaller writes
887 dataview = memoryview(data)
889 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
890 n += config.KEEP_BLOCK_SIZE
893 self._committed = False
895 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
896 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
898 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
899 self._repack_writes(num_retries)
900 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
901 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
902 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
904 self._current_bblock.append(data)
906 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
908 self.parent.notify(WRITE, self.parent, self.name, (self, self))
913 def flush(self, sync=True, num_retries=0):
914 """Flush the current bufferblock to Keep.
917 If True, commit block synchronously, wait until buffer block has been written.
918 If False, commit block asynchronously, return immediately after putting block into
924 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
925 if self._current_bblock.state() == _BufferBlock.WRITABLE:
926 self._repack_writes(num_retries)
927 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
931 for s in self._segments:
932 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
934 if bb.state() != _BufferBlock.COMMITTED:
935 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
936 to_delete.add(s.locator)
937 s.locator = bb.locator()
939 self.parent._my_block_manager().delete_bufferblock(s)
941 self.parent.notify(MOD, self.parent, self.name, (self, self))
945 def add_segment(self, blocks, pos, size):
946 """Add a segment to the end of the file.
948 `pos` and `offset` reference a section of the stream described by
949 `blocks` (a list of Range objects)
952 self._add_segment(blocks, pos, size)
954 def _add_segment(self, blocks, pos, size):
955 """Internal implementation of add_segment."""
956 self._committed = False
957 for lr in locators_and_ranges(blocks, pos, size):
958 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
959 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
960 self._segments.append(r)
964 """Get the file size."""
966 n = self._segments[-1]
967 return n.range_start + n.range_size
972 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
975 for segment in self.segments:
976 loc = segment.locator
977 if loc.startswith("bufferblock"):
978 loc = self._bufferblocks[loc].calculate_locator()
979 if portable_locators:
980 loc = KeepLocator(loc).stripped()
981 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
982 segment.segment_offset, segment.range_size))
983 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
989 def _reparent(self, newparent, newname):
990 self._committed = False
991 self.flush(sync=True)
992 self.parent.remove(self.name)
993 self.parent = newparent
995 self.lock = self.parent.root_collection().lock
998 class ArvadosFileReader(ArvadosFileReaderBase):
999 """Wraps ArvadosFile in a file-like object supporting reading only.
1001 Be aware that this class is NOT thread safe as there is no locking around
1002 updating file pointer.
1006 def __init__(self, arvadosfile, num_retries=None):
1007 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1008 self.arvadosfile = arvadosfile
1011 return self.arvadosfile.size()
1013 def stream_name(self):
1014 return self.arvadosfile.parent.stream_name()
1016 @_FileLikeObjectBase._before_close
1018 def read(self, size=None, num_retries=None):
1019 """Read up to `size` bytes from the file and return the result.
1021 Starts at the current file position. If `size` is None, read the
1022 entire remainder of the file.
1026 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1029 self._filepos += len(rd)
1030 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1031 return ''.join(data)
1033 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1034 self._filepos += len(data)
1037 @_FileLikeObjectBase._before_close
1039 def readfrom(self, offset, size, num_retries=None):
1040 """Read up to `size` bytes from the stream, starting at the specified file offset.
1042 This method does not change the file position.
1044 return self.arvadosfile.readfrom(offset, size, num_retries)
1050 class ArvadosFileWriter(ArvadosFileReader):
1051 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1053 Be aware that this class is NOT thread safe as there is no locking around
1054 updating file pointer.
1058 def __init__(self, arvadosfile, mode, num_retries=None):
1059 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1062 @_FileLikeObjectBase._before_close
1064 def write(self, data, num_retries=None):
1065 if self.mode[0] == "a":
1066 self.arvadosfile.writeto(self.size(), data, num_retries)
1068 self.arvadosfile.writeto(self._filepos, data, num_retries)
1069 self._filepos += len(data)
1072 @_FileLikeObjectBase._before_close
1074 def writelines(self, seq, num_retries=None):
1076 self.write(s, num_retries=num_retries)
1078 @_FileLikeObjectBase._before_close
1079 def truncate(self, size=None):
1081 size = self._filepos
1082 self.arvadosfile.truncate(size)
1083 if self._filepos > self.size():
1084 self._filepos = self.size()
1086 @_FileLikeObjectBase._before_close
1088 self.arvadosfile.flush()
1093 super(ArvadosFileWriter, self).close()