14 from .errors import KeepWriteError, AssertionError, ArgumentError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
23 _logger = logging.getLogger('arvados.arvfile')
26 """split(path) -> streamname, filename
28 Separate the stream name and file name in a /-separated stream path and
29 return a tuple (stream_name, file_name). If no stream name is available,
34 stream_name, file_name = path.rsplit('/', 1)
35 except ValueError: # No / in string
36 stream_name, file_name = '.', path
37 return stream_name, file_name
39 class _FileLikeObjectBase(object):
40 def __init__(self, name, mode):
46 def _before_close(orig_func):
47 @functools.wraps(orig_func)
48 def before_close_wrapper(self, *args, **kwargs):
50 raise ValueError("I/O operation on closed stream file")
51 return orig_func(self, *args, **kwargs)
52 return before_close_wrapper
57 def __exit__(self, exc_type, exc_value, traceback):
68 class ArvadosFileReaderBase(_FileLikeObjectBase):
69 def __init__(self, name, mode, num_retries=None):
70 super(ArvadosFileReaderBase, self).__init__(name, mode)
72 self.num_retries = num_retries
73 self._readline_cache = (None, None)
77 data = self.readline()
82 def decompressed_name(self):
83 return re.sub('\.(bz2|gz)$', '', self.name)
85 @_FileLikeObjectBase._before_close
86 def seek(self, pos, whence=os.SEEK_SET):
87 if whence == os.SEEK_CUR:
89 elif whence == os.SEEK_END:
91 self._filepos = min(max(pos, 0L), self.size())
96 @_FileLikeObjectBase._before_close
98 def readall(self, size=2**20, num_retries=None):
100 data = self.read(size, num_retries=num_retries)
105 @_FileLikeObjectBase._before_close
107 def readline(self, size=float('inf'), num_retries=None):
108 cache_pos, cache_data = self._readline_cache
109 if self.tell() == cache_pos:
113 data_size = len(data[-1])
114 while (data_size < size) and ('\n' not in data[-1]):
115 next_read = self.read(2 ** 20, num_retries=num_retries)
118 data.append(next_read)
119 data_size += len(next_read)
122 nextline_index = data.index('\n') + 1
124 nextline_index = len(data)
125 nextline_index = min(nextline_index, size)
126 self._readline_cache = (self.tell(), data[nextline_index:])
127 return data[:nextline_index]
129 @_FileLikeObjectBase._before_close
131 def decompress(self, decompress, size, num_retries=None):
132 for segment in self.readall(size, num_retries):
133 data = decompress(segment)
137 @_FileLikeObjectBase._before_close
139 def readall_decompressed(self, size=2**20, num_retries=None):
141 if self.name.endswith('.bz2'):
142 dc = bz2.BZ2Decompressor()
143 return self.decompress(dc.decompress, size,
144 num_retries=num_retries)
145 elif self.name.endswith('.gz'):
146 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
147 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
148 size, num_retries=num_retries)
150 return self.readall(size, num_retries=num_retries)
152 @_FileLikeObjectBase._before_close
154 def readlines(self, sizehint=float('inf'), num_retries=None):
157 for s in self.readall(num_retries=num_retries):
160 if data_size >= sizehint:
162 return ''.join(data).splitlines(True)
165 raise NotImplementedError()
167 def read(self, size, num_retries=None):
168 raise NotImplementedError()
170 def readfrom(self, start, size, num_retries=None):
171 raise NotImplementedError()
174 class StreamFileReader(ArvadosFileReaderBase):
175 class _NameAttribute(str):
176 # The Python file API provides a plain .name attribute.
177 # Older SDK provided a name() method.
178 # This class provides both, for maximum compatibility.
182 def __init__(self, stream, segments, name):
183 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
184 self._stream = stream
185 self.segments = segments
187 def stream_name(self):
188 return self._stream.name()
191 n = self.segments[-1]
192 return n.range_start + n.range_size
194 @_FileLikeObjectBase._before_close
196 def read(self, size, num_retries=None):
197 """Read up to 'size' bytes from the stream, starting at the current file position"""
202 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
204 lr = available_chunks[0]
205 data = self._stream.readfrom(lr.locator+lr.segment_offset,
207 num_retries=num_retries)
209 self._filepos += len(data)
212 @_FileLikeObjectBase._before_close
214 def readfrom(self, start, size, num_retries=None):
215 """Read up to 'size' bytes from the stream, starting at 'start'"""
220 for lr in locators_and_ranges(self.segments, start, size):
221 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
222 num_retries=num_retries))
225 def as_manifest(self):
227 for r in self.segments:
228 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
229 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
232 def synchronized(orig_func):
233 @functools.wraps(orig_func)
234 def synchronized_wrapper(self, *args, **kwargs):
236 return orig_func(self, *args, **kwargs)
237 return synchronized_wrapper
240 class StateChangeError(Exception):
241 def __init__(self, message, state, nextstate):
242 super(StateChangeError, self).__init__(message)
244 self.nextstate = nextstate
246 class _BufferBlock(object):
247 """A stand-in for a Keep block that is in the process of being written.
249 Writers can append to it, get the size, and compute the Keep locator.
250 There are three valid states:
256 Block is in the process of being uploaded to Keep, append is an error.
259 The block has been written to Keep, its internal buffer has been
260 released, fetching the block will fetch it via keep client (since we
261 discarded the internal copy), and identifiers referring to the BufferBlock
262 can be replaced with the block locator.
271 def __init__(self, blockid, starting_capacity, owner):
274 the identifier for this block
277 the initial buffer capacity
280 ArvadosFile that owns this block
283 self.blockid = blockid
284 self.buffer_block = bytearray(starting_capacity)
285 self.buffer_view = memoryview(self.buffer_block)
286 self.write_pointer = 0
287 self._state = _BufferBlock.WRITABLE
290 self.lock = threading.Lock()
291 self.wait_for_commit = threading.Event()
295 def append(self, data):
296 """Append some data to the buffer.
298 Only valid if the block is in WRITABLE state. Implements an expanding
299 buffer, doubling capacity as needed to accomdate all the data.
302 if self._state == _BufferBlock.WRITABLE:
303 while (self.write_pointer+len(data)) > len(self.buffer_block):
304 new_buffer_block = bytearray(len(self.buffer_block) * 2)
305 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
306 self.buffer_block = new_buffer_block
307 self.buffer_view = memoryview(self.buffer_block)
308 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
309 self.write_pointer += len(data)
312 raise AssertionError("Buffer block is not writable")
314 STATE_TRANSITIONS = frozenset([
316 (PENDING, COMMITTED),
321 def set_state(self, nextstate, val=None):
322 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
323 raise StateChangeError("Invalid state change from %s to %s" % (self.state, nextstate), self.state, nextstate)
324 self._state = nextstate
326 if self._state == _BufferBlock.PENDING:
327 self.wait_for_commit.clear()
329 if self._state == _BufferBlock.COMMITTED:
331 self.buffer_view = None
332 self.buffer_block = None
333 self.wait_for_commit.set()
335 if self._state == _BufferBlock.ERROR:
337 self.wait_for_commit.set()
344 """The amount of data written to the buffer."""
345 return self.write_pointer
349 """The Keep locator for this buffer's contents."""
350 if self._locator is None:
351 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
355 def clone(self, new_blockid, owner):
356 if self._state == _BufferBlock.COMMITTED:
357 raise AssertionError("Cannot duplicate committed buffer block")
358 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
359 bufferblock.append(self.buffer_view[0:self.size()])
365 self.buffer_block = None
366 self.buffer_view = None
369 class NoopLock(object):
373 def __exit__(self, exc_type, exc_value, traceback):
376 def acquire(self, blocking=False):
383 def must_be_writable(orig_func):
384 @functools.wraps(orig_func)
385 def must_be_writable_wrapper(self, *args, **kwargs):
386 if not self.writable():
387 raise IOError(errno.EROFS, "Collection is read-only.")
388 return orig_func(self, *args, **kwargs)
389 return must_be_writable_wrapper
392 class _BlockManager(object):
393 """BlockManager handles buffer blocks.
395 Also handles background block uploads, and background block prefetch for a
396 Collection of ArvadosFiles.
400 DEFAULT_PUT_THREADS = 2
401 DEFAULT_GET_THREADS = 2
403 def __init__(self, keep):
404 """keep: KeepClient object to use"""
406 self._bufferblocks = {}
407 self._put_queue = None
408 self._put_threads = None
409 self._prefetch_queue = None
410 self._prefetch_threads = None
411 self.lock = threading.Lock()
412 self.prefetch_enabled = True
413 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
414 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
417 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
418 """Allocate a new, empty bufferblock in WRITABLE state and return it.
421 optional block identifier, otherwise one will be automatically assigned
424 optional capacity, otherwise will use default capacity
427 ArvadosFile that owns this block
431 blockid = "bufferblock%i" % len(self._bufferblocks)
432 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
433 self._bufferblocks[bufferblock.blockid] = bufferblock
437 def dup_block(self, block, owner):
438 """Create a new bufferblock initialized with the content of an existing bufferblock.
441 the buffer block to copy.
444 ArvadosFile that owns the new block
447 new_blockid = "bufferblock%i" % len(self._bufferblocks)
448 bufferblock = block.clone(new_blockid, owner)
449 self._bufferblocks[bufferblock.blockid] = bufferblock
453 def is_bufferblock(self, locator):
454 return locator in self._bufferblocks
456 def _commit_bufferblock_worker(self):
457 """Background uploader thread."""
461 bufferblock = self._put_queue.get()
462 if bufferblock is None:
465 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
466 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
468 except Exception as e:
469 bufferblock.set_state(_BufferBlock.ERROR, e)
471 if self._put_queue is not None:
472 self._put_queue.task_done()
475 def start_put_threads(self):
476 if self._put_threads is None:
477 # Start uploader threads.
479 # If we don't limit the Queue size, the upload queue can quickly
480 # grow to take up gigabytes of RAM if the writing process is
481 # generating data more quickly than it can be send to the Keep
484 # With two upload threads and a queue size of 2, this means up to 4
485 # blocks pending. If they are full 64 MiB blocks, that means up to
486 # 256 MiB of internal buffering, which is the same size as the
487 # default download block cache in KeepClient.
488 self._put_queue = Queue.Queue(maxsize=2)
490 self._put_threads = []
491 for i in xrange(0, self.num_put_threads):
492 thread = threading.Thread(target=self._commit_bufferblock_worker)
493 self._put_threads.append(thread)
494 thread.daemon = False
497 def _block_prefetch_worker(self):
498 """The background downloader thread."""
501 b = self._prefetch_queue.get()
509 def start_get_threads(self):
510 if self._prefetch_threads is None:
511 self._prefetch_queue = Queue.Queue()
512 self._prefetch_threads = []
513 for i in xrange(0, self.num_get_threads):
514 thread = threading.Thread(target=self._block_prefetch_worker)
515 self._prefetch_threads.append(thread)
521 def stop_threads(self):
522 """Shut down and wait for background upload and download threads to finish."""
524 if self._put_threads is not None:
525 for t in self._put_threads:
526 self._put_queue.put(None)
527 for t in self._put_threads:
529 self._put_threads = None
530 self._put_queue = None
532 if self._prefetch_threads is not None:
533 for t in self._prefetch_threads:
534 self._prefetch_queue.put(None)
535 for t in self._prefetch_threads:
537 self._prefetch_threads = None
538 self._prefetch_queue = None
543 def __exit__(self, exc_type, exc_value, traceback):
549 def commit_bufferblock(self, block, sync):
550 """Initiate a background upload of a bufferblock.
553 The block object to upload
556 If `sync` is True, upload the block synchronously.
557 If `sync` is False, upload the block asynchronously. This will
558 return immediately unless the upload queue is at capacity, in
559 which case it will wait on an upload queue slot.
564 # Mark the block as PENDING so to disallow any more appends.
565 block.set_state(_BufferBlock.PENDING)
566 except StateChangeError as e:
567 if e.state == _BufferBlock.PENDING and sync:
568 block.wait_for_commit.wait()
569 if block.state() == _BufferBlock.ERROR:
575 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
576 block.set_state(_BufferBlock.COMMITTED, loc)
577 except Exception as e:
578 block.set_state(_BufferBlock.ERROR, e)
581 self.start_put_threads()
582 self._put_queue.put(block)
585 def get_bufferblock(self, locator):
586 return self._bufferblocks.get(locator)
589 def delete_bufferblock(self, locator):
590 bb = self._bufferblocks[locator]
592 del self._bufferblocks[locator]
594 def get_block_contents(self, locator, num_retries, cache_only=False):
597 First checks to see if the locator is a BufferBlock and return that, if
598 not, passes the request through to KeepClient.get().
602 if locator in self._bufferblocks:
603 bufferblock = self._bufferblocks[locator]
604 if bufferblock.state() != _BufferBlock.COMMITTED:
605 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
607 locator = bufferblock._locator
609 return self._keep.get_from_cache(locator)
611 return self._keep.get(locator, num_retries=num_retries)
613 def commit_all(self):
614 """Commit all outstanding buffer blocks.
616 This is a synchronous call, and will not return until all buffer blocks
617 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
621 items = self._bufferblocks.items()
624 if v.state() != _BufferBlock.COMMITTED:
625 v.owner.flush(sync=False)
628 if self._put_queue is not None:
629 self._put_queue.join()
633 if v.state() == _BufferBlock.ERROR:
634 err.append((v.locator(), v.error))
636 raise KeepWriteError("Error writing some blocks", err, label="block")
639 # flush again with sync=True to remove committed bufferblocks from
642 v.owner.flush(sync=True)
644 def block_prefetch(self, locator):
645 """Initiate a background download of a block.
647 This assumes that the underlying KeepClient implements a block cache,
648 so repeated requests for the same block will not result in repeated
649 downloads (unless the block is evicted from the cache.) This method
654 if not self.prefetch_enabled:
657 if self._keep.get_from_cache(locator) is not None:
661 if locator in self._bufferblocks:
664 self.start_get_threads()
665 self._prefetch_queue.put(locator)
668 class ArvadosFile(object):
669 """Represent a file in a Collection.
671 ArvadosFile manages the underlying representation of a file in Keep as a
672 sequence of segments spanning a set of blocks, and implements random
675 This object may be accessed from multiple threads.
679 def __init__(self, parent, name, stream=[], segments=[]):
681 ArvadosFile constructor.
684 a list of Range objects representing a block stream
687 a list of Range objects representing segments
691 self._committed = False
693 self.lock = parent.root_collection().lock
695 self._add_segment(stream, s.locator, s.range_size)
696 self._current_bblock = None
699 return self.parent.writable()
703 return copy.copy(self._segments)
706 def clone(self, new_parent, new_name):
707 """Make a copy of this file."""
708 cp = ArvadosFile(new_parent, new_name)
709 cp.replace_contents(self)
714 def replace_contents(self, other):
715 """Replace segments of this file with segments from another `ArvadosFile` object."""
719 for other_segment in other.segments():
720 new_loc = other_segment.locator
721 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
722 if other_segment.locator not in map_loc:
723 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
724 if bufferblock.state() != _BufferBlock.WRITABLE:
725 map_loc[other_segment.locator] = bufferblock.locator()
727 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
728 new_loc = map_loc[other_segment.locator]
730 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
732 self._committed = False
734 def __eq__(self, other):
737 if not isinstance(other, ArvadosFile):
740 othersegs = other.segments()
742 if len(self._segments) != len(othersegs):
744 for i in xrange(0, len(othersegs)):
745 seg1 = self._segments[i]
750 if self.parent._my_block_manager().is_bufferblock(loc1):
751 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
753 if other.parent._my_block_manager().is_bufferblock(loc2):
754 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
756 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
757 seg1.range_start != seg2.range_start or
758 seg1.range_size != seg2.range_size or
759 seg1.segment_offset != seg2.segment_offset):
764 def __ne__(self, other):
765 return not self.__eq__(other)
768 def set_committed(self):
769 """Set committed flag to False"""
770 self._committed = True
774 """Get whether this is committed or not."""
775 return self._committed
779 def truncate(self, size):
780 """Shrink the size of the file.
782 If `size` is less than the size of the file, the file contents after
783 `size` will be discarded. If `size` is greater than the current size
784 of the file, an IOError will be raised.
787 if size < self.size():
789 for r in self._segments:
790 range_end = r.range_start+r.range_size
791 if r.range_start >= size:
792 # segment is past the trucate size, all done
794 elif size < range_end:
795 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
796 nr.segment_offset = r.segment_offset
802 self._segments = new_segs
803 self._committed = False
804 elif size > self.size():
805 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
807 def readfrom(self, offset, size, num_retries, exact=False):
808 """Read up to `size` bytes from the file starting at `offset`.
811 If False (default), return less data than requested if the read
812 crosses a block boundary and the next block isn't cached. If True,
813 only return less data than requested when hitting EOF.
817 if size == 0 or offset >= self.size():
819 readsegs = locators_and_ranges(self._segments, offset, size)
820 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
825 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
827 blockview = memoryview(block)
828 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
834 if lr.locator not in locs:
835 self.parent._my_block_manager().block_prefetch(lr.locator)
840 def _repack_writes(self, num_retries):
841 """Test if the buffer block has more data than actual segments.
843 This happens when a buffered write over-writes a file range written in
844 a previous buffered write. Re-pack the buffer block for efficiency
845 and to avoid leaking information.
848 segs = self._segments
850 # Sum up the segments to get the total bytes of the file referencing
851 # into the buffer block.
852 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
853 write_total = sum([s.range_size for s in bufferblock_segs])
855 if write_total < self._current_bblock.size():
856 # There is more data in the buffer block than is actually accounted for by segments, so
857 # re-pack into a new buffer by copying over to a new buffer block.
858 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
859 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
860 for t in bufferblock_segs:
861 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
862 t.segment_offset = new_bb.size() - t.range_size
864 self._current_bblock = new_bb
868 def writeto(self, offset, data, num_retries):
869 """Write `data` to the file starting at `offset`.
871 This will update existing bytes and/or extend the size of the file as
878 if offset > self.size():
879 raise ArgumentError("Offset is past the end of the file")
881 if len(data) > config.KEEP_BLOCK_SIZE:
882 # Chunk it up into smaller writes
884 dataview = memoryview(data)
886 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
887 n += config.KEEP_BLOCK_SIZE
890 self._committed = False
892 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
893 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
895 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
896 self._repack_writes(num_retries)
897 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
898 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
899 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
901 self._current_bblock.append(data)
903 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
905 self.parent.notify(WRITE, self.parent, self.name, (self, self))
910 def flush(self, sync=True, num_retries=0):
911 """Flush the current bufferblock to Keep.
914 If True, commit block synchronously, wait until buffer block has been written.
915 If False, commit block asynchronously, return immediately after putting block into
921 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
922 if self._current_bblock.state() == _BufferBlock.WRITABLE:
923 self._repack_writes(num_retries)
924 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
928 for s in self._segments:
929 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
931 if bb.state() != _BufferBlock.COMMITTED:
932 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=True)
933 to_delete.add(s.locator)
934 s.locator = bb.locator()
936 self.parent._my_block_manager().delete_bufferblock(s)
938 self.parent.notify(MOD, self.parent, self.name, (self, self))
942 def add_segment(self, blocks, pos, size):
943 """Add a segment to the end of the file.
945 `pos` and `offset` reference a section of the stream described by
946 `blocks` (a list of Range objects)
949 self._add_segment(blocks, pos, size)
951 def _add_segment(self, blocks, pos, size):
952 """Internal implementation of add_segment."""
953 self._committed = False
954 for lr in locators_and_ranges(blocks, pos, size):
955 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
956 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
957 self._segments.append(r)
961 """Get the file size."""
963 n = self._segments[-1]
964 return n.range_start + n.range_size
969 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
972 for segment in self.segments:
973 loc = segment.locator
974 if loc.startswith("bufferblock"):
975 loc = self._bufferblocks[loc].calculate_locator()
976 if portable_locators:
977 loc = KeepLocator(loc).stripped()
978 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
979 segment.segment_offset, segment.range_size))
980 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
986 def _reparent(self, newparent, newname):
987 self._committed = False
988 self.flush(sync=True)
989 self.parent.remove(self.name)
990 self.parent = newparent
992 self.lock = self.parent.root_collection().lock
995 class ArvadosFileReader(ArvadosFileReaderBase):
996 """Wraps ArvadosFile in a file-like object supporting reading only.
998 Be aware that this class is NOT thread safe as there is no locking around
999 updating file pointer.
1003 def __init__(self, arvadosfile, num_retries=None):
1004 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1005 self.arvadosfile = arvadosfile
1008 return self.arvadosfile.size()
1010 def stream_name(self):
1011 return self.arvadosfile.parent.stream_name()
1013 @_FileLikeObjectBase._before_close
1015 def read(self, size=None, num_retries=None):
1016 """Read up to `size` bytes from the file and return the result.
1018 Starts at the current file position. If `size` is None, read the
1019 entire remainder of the file.
1023 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1026 self._filepos += len(rd)
1027 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1028 return ''.join(data)
1030 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1031 self._filepos += len(data)
1034 @_FileLikeObjectBase._before_close
1036 def readfrom(self, offset, size, num_retries=None):
1037 """Read up to `size` bytes from the stream, starting at the specified file offset.
1039 This method does not change the file position.
1041 return self.arvadosfile.readfrom(offset, size, num_retries)
1047 class ArvadosFileWriter(ArvadosFileReader):
1048 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1050 Be aware that this class is NOT thread safe as there is no locking around
1051 updating file pointer.
1055 def __init__(self, arvadosfile, mode, num_retries=None):
1056 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1059 @_FileLikeObjectBase._before_close
1061 def write(self, data, num_retries=None):
1062 if self.mode[0] == "a":
1063 self.arvadosfile.writeto(self.size(), data, num_retries)
1065 self.arvadosfile.writeto(self._filepos, data, num_retries)
1066 self._filepos += len(data)
1069 @_FileLikeObjectBase._before_close
1071 def writelines(self, seq, num_retries=None):
1073 self.write(s, num_retries)
1075 @_FileLikeObjectBase._before_close
1076 def truncate(self, size=None):
1078 size = self._filepos
1079 self.arvadosfile.truncate(size)
1080 if self._filepos > self.size():
1081 self._filepos = self.size()
1083 @_FileLikeObjectBase._before_close
1085 self.arvadosfile.flush()
1090 super(ArvadosFileWriter, self).close()