14 from .errors import KeepWriteError, AssertionError, ArgumentError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
23 _logger = logging.getLogger('arvados.arvfile')
26 """split(path) -> streamname, filename
28 Separate the stream name and file name in a /-separated stream path and
29 return a tuple (stream_name, file_name). If no stream name is available,
34 stream_name, file_name = path.rsplit('/', 1)
35 except ValueError: # No / in string
36 stream_name, file_name = '.', path
37 return stream_name, file_name
39 class _FileLikeObjectBase(object):
40 def __init__(self, name, mode):
46 def _before_close(orig_func):
47 @functools.wraps(orig_func)
48 def before_close_wrapper(self, *args, **kwargs):
50 raise ValueError("I/O operation on closed stream file")
51 return orig_func(self, *args, **kwargs)
52 return before_close_wrapper
57 def __exit__(self, exc_type, exc_value, traceback):
68 class ArvadosFileReaderBase(_FileLikeObjectBase):
69 def __init__(self, name, mode, num_retries=None):
70 super(ArvadosFileReaderBase, self).__init__(name, mode)
72 self.num_retries = num_retries
73 self._readline_cache = (None, None)
77 data = self.readline()
82 def decompressed_name(self):
83 return re.sub('\.(bz2|gz)$', '', self.name)
85 @_FileLikeObjectBase._before_close
86 def seek(self, pos, whence=os.SEEK_SET):
87 if whence == os.SEEK_CUR:
89 elif whence == os.SEEK_END:
91 self._filepos = min(max(pos, 0L), self.size())
96 @_FileLikeObjectBase._before_close
98 def readall(self, size=2**20, num_retries=None):
100 data = self.read(size, num_retries=num_retries)
105 @_FileLikeObjectBase._before_close
107 def readline(self, size=float('inf'), num_retries=None):
108 cache_pos, cache_data = self._readline_cache
109 if self.tell() == cache_pos:
113 data_size = len(data[-1])
114 while (data_size < size) and ('\n' not in data[-1]):
115 next_read = self.read(2 ** 20, num_retries=num_retries)
118 data.append(next_read)
119 data_size += len(next_read)
122 nextline_index = data.index('\n') + 1
124 nextline_index = len(data)
125 nextline_index = min(nextline_index, size)
126 self._readline_cache = (self.tell(), data[nextline_index:])
127 return data[:nextline_index]
129 @_FileLikeObjectBase._before_close
131 def decompress(self, decompress, size, num_retries=None):
132 for segment in self.readall(size, num_retries=num_retries):
133 data = decompress(segment)
137 @_FileLikeObjectBase._before_close
139 def readall_decompressed(self, size=2**20, num_retries=None):
141 if self.name.endswith('.bz2'):
142 dc = bz2.BZ2Decompressor()
143 return self.decompress(dc.decompress, size,
144 num_retries=num_retries)
145 elif self.name.endswith('.gz'):
146 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
147 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
148 size, num_retries=num_retries)
150 return self.readall(size, num_retries=num_retries)
152 @_FileLikeObjectBase._before_close
154 def readlines(self, sizehint=float('inf'), num_retries=None):
157 for s in self.readall(num_retries=num_retries):
160 if data_size >= sizehint:
162 return ''.join(data).splitlines(True)
165 raise NotImplementedError()
167 def read(self, size, num_retries=None):
168 raise NotImplementedError()
170 def readfrom(self, start, size, num_retries=None):
171 raise NotImplementedError()
174 class StreamFileReader(ArvadosFileReaderBase):
175 class _NameAttribute(str):
176 # The Python file API provides a plain .name attribute.
177 # Older SDK provided a name() method.
178 # This class provides both, for maximum compatibility.
182 def __init__(self, stream, segments, name):
183 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
184 self._stream = stream
185 self.segments = segments
187 def stream_name(self):
188 return self._stream.name()
191 n = self.segments[-1]
192 return n.range_start + n.range_size
194 @_FileLikeObjectBase._before_close
196 def read(self, size, num_retries=None):
197 """Read up to 'size' bytes from the stream, starting at the current file position"""
202 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
204 lr = available_chunks[0]
205 data = self._stream.readfrom(lr.locator+lr.segment_offset,
207 num_retries=num_retries)
209 self._filepos += len(data)
212 @_FileLikeObjectBase._before_close
214 def readfrom(self, start, size, num_retries=None):
215 """Read up to 'size' bytes from the stream, starting at 'start'"""
220 for lr in locators_and_ranges(self.segments, start, size):
221 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
222 num_retries=num_retries))
225 def as_manifest(self):
227 for r in self.segments:
228 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
229 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
232 def synchronized(orig_func):
233 @functools.wraps(orig_func)
234 def synchronized_wrapper(self, *args, **kwargs):
236 return orig_func(self, *args, **kwargs)
237 return synchronized_wrapper
240 class StateChangeError(Exception):
241 def __init__(self, message, state, nextstate):
242 super(StateChangeError, self).__init__(message)
244 self.nextstate = nextstate
246 class _BufferBlock(object):
247 """A stand-in for a Keep block that is in the process of being written.
249 Writers can append to it, get the size, and compute the Keep locator.
250 There are three valid states:
256 Block is in the process of being uploaded to Keep, append is an error.
259 The block has been written to Keep, its internal buffer has been
260 released, fetching the block will fetch it via keep client (since we
261 discarded the internal copy), and identifiers referring to the BufferBlock
262 can be replaced with the block locator.
271 def __init__(self, blockid, starting_capacity, owner):
274 the identifier for this block
277 the initial buffer capacity
280 ArvadosFile that owns this block
283 self.blockid = blockid
284 self.buffer_block = bytearray(starting_capacity)
285 self.buffer_view = memoryview(self.buffer_block)
286 self.write_pointer = 0
287 self._state = _BufferBlock.WRITABLE
290 self.lock = threading.Lock()
291 self.wait_for_commit = threading.Event()
295 def append(self, data):
296 """Append some data to the buffer.
298 Only valid if the block is in WRITABLE state. Implements an expanding
299 buffer, doubling capacity as needed to accomdate all the data.
302 if self._state == _BufferBlock.WRITABLE:
303 while (self.write_pointer+len(data)) > len(self.buffer_block):
304 new_buffer_block = bytearray(len(self.buffer_block) * 2)
305 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
306 self.buffer_block = new_buffer_block
307 self.buffer_view = memoryview(self.buffer_block)
308 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
309 self.write_pointer += len(data)
312 raise AssertionError("Buffer block is not writable")
314 STATE_TRANSITIONS = frozenset([
316 (PENDING, COMMITTED),
321 def set_state(self, nextstate, val=None):
322 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
323 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
324 self._state = nextstate
326 if self._state == _BufferBlock.PENDING:
327 self.wait_for_commit.clear()
329 if self._state == _BufferBlock.COMMITTED:
331 self.buffer_view = None
332 self.buffer_block = None
333 self.wait_for_commit.set()
335 if self._state == _BufferBlock.ERROR:
337 self.wait_for_commit.set()
344 """The amount of data written to the buffer."""
345 return self.write_pointer
349 """The Keep locator for this buffer's contents."""
350 if self._locator is None:
351 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
355 def clone(self, new_blockid, owner):
356 if self._state == _BufferBlock.COMMITTED:
357 raise AssertionError("Cannot duplicate committed buffer block")
358 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
359 bufferblock.append(self.buffer_view[0:self.size()])
365 self.buffer_block = None
366 self.buffer_view = None
369 class NoopLock(object):
373 def __exit__(self, exc_type, exc_value, traceback):
376 def acquire(self, blocking=False):
383 def must_be_writable(orig_func):
384 @functools.wraps(orig_func)
385 def must_be_writable_wrapper(self, *args, **kwargs):
386 if not self.writable():
387 raise IOError(errno.EROFS, "Collection is read-only.")
388 return orig_func(self, *args, **kwargs)
389 return must_be_writable_wrapper
392 class _BlockManager(object):
393 """BlockManager handles buffer blocks.
395 Also handles background block uploads, and background block prefetch for a
396 Collection of ArvadosFiles.
400 DEFAULT_PUT_THREADS = 2
401 DEFAULT_GET_THREADS = 2
403 def __init__(self, keep):
404 """keep: KeepClient object to use"""
406 self._bufferblocks = {}
407 self._put_queue = None
408 self._put_threads = None
409 self._prefetch_queue = None
410 self._prefetch_threads = None
411 self.lock = threading.Lock()
412 self.prefetch_enabled = True
413 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
414 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
417 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
418 """Allocate a new, empty bufferblock in WRITABLE state and return it.
421 optional block identifier, otherwise one will be automatically assigned
424 optional capacity, otherwise will use default capacity
427 ArvadosFile that owns this block
431 blockid = "bufferblock%i" % len(self._bufferblocks)
432 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
433 self._bufferblocks[bufferblock.blockid] = bufferblock
437 def dup_block(self, block, owner):
438 """Create a new bufferblock initialized with the content of an existing bufferblock.
441 the buffer block to copy.
444 ArvadosFile that owns the new block
447 new_blockid = "bufferblock%i" % len(self._bufferblocks)
448 bufferblock = block.clone(new_blockid, owner)
449 self._bufferblocks[bufferblock.blockid] = bufferblock
453 def is_bufferblock(self, locator):
454 return locator in self._bufferblocks
456 def _commit_bufferblock_worker(self):
457 """Background uploader thread."""
461 bufferblock = self._put_queue.get()
462 if bufferblock is None:
465 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
466 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
468 except Exception as e:
469 bufferblock.set_state(_BufferBlock.ERROR, e)
471 if self._put_queue is not None:
472 self._put_queue.task_done()
475 def start_put_threads(self):
476 if self._put_threads is None:
477 # Start uploader threads.
479 # If we don't limit the Queue size, the upload queue can quickly
480 # grow to take up gigabytes of RAM if the writing process is
481 # generating data more quickly than it can be send to the Keep
484 # With two upload threads and a queue size of 2, this means up to 4
485 # blocks pending. If they are full 64 MiB blocks, that means up to
486 # 256 MiB of internal buffering, which is the same size as the
487 # default download block cache in KeepClient.
488 self._put_queue = Queue.Queue(maxsize=2)
490 self._put_threads = []
491 for i in xrange(0, self.num_put_threads):
492 thread = threading.Thread(target=self._commit_bufferblock_worker)
493 self._put_threads.append(thread)
497 def _block_prefetch_worker(self):
498 """The background downloader thread."""
501 b = self._prefetch_queue.get()
509 def start_get_threads(self):
510 if self._prefetch_threads is None:
511 self._prefetch_queue = Queue.Queue()
512 self._prefetch_threads = []
513 for i in xrange(0, self.num_get_threads):
514 thread = threading.Thread(target=self._block_prefetch_worker)
515 self._prefetch_threads.append(thread)
521 def stop_threads(self):
522 """Shut down and wait for background upload and download threads to finish."""
524 if self._put_threads is not None:
525 for t in self._put_threads:
526 self._put_queue.put(None)
527 for t in self._put_threads:
529 self._put_threads = None
530 self._put_queue = None
532 if self._prefetch_threads is not None:
533 for t in self._prefetch_threads:
534 self._prefetch_queue.put(None)
535 for t in self._prefetch_threads:
537 self._prefetch_threads = None
538 self._prefetch_queue = None
543 def __exit__(self, exc_type, exc_value, traceback):
549 def commit_bufferblock(self, block, sync):
550 """Initiate a background upload of a bufferblock.
553 The block object to upload
556 If `sync` is True, upload the block synchronously.
557 If `sync` is False, upload the block asynchronously. This will
558 return immediately unless the upload queue is at capacity, in
559 which case it will wait on an upload queue slot.
564 # Mark the block as PENDING so to disallow any more appends.
565 block.set_state(_BufferBlock.PENDING)
566 except StateChangeError as e:
567 if e.state == _BufferBlock.PENDING:
569 block.wait_for_commit.wait()
572 if block.state() == _BufferBlock.COMMITTED:
574 elif block.state() == _BufferBlock.ERROR:
581 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
582 block.set_state(_BufferBlock.COMMITTED, loc)
583 except Exception as e:
584 block.set_state(_BufferBlock.ERROR, e)
587 self.start_put_threads()
588 self._put_queue.put(block)
591 def get_bufferblock(self, locator):
592 return self._bufferblocks.get(locator)
595 def delete_bufferblock(self, locator):
596 bb = self._bufferblocks[locator]
598 del self._bufferblocks[locator]
600 def get_block_contents(self, locator, num_retries, cache_only=False):
603 First checks to see if the locator is a BufferBlock and return that, if
604 not, passes the request through to KeepClient.get().
608 if locator in self._bufferblocks:
609 bufferblock = self._bufferblocks[locator]
610 if bufferblock.state() != _BufferBlock.COMMITTED:
611 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
613 locator = bufferblock._locator
615 return self._keep.get_from_cache(locator)
617 return self._keep.get(locator, num_retries=num_retries)
619 def commit_all(self):
620 """Commit all outstanding buffer blocks.
622 This is a synchronous call, and will not return until all buffer blocks
623 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
627 items = self._bufferblocks.items()
630 if v.state() != _BufferBlock.COMMITTED:
631 v.owner.flush(sync=False)
634 if self._put_queue is not None:
635 self._put_queue.join()
639 if v.state() == _BufferBlock.ERROR:
640 err.append((v.locator(), v.error))
642 raise KeepWriteError("Error writing some blocks", err, label="block")
645 # flush again with sync=True to remove committed bufferblocks from
648 v.owner.flush(sync=True)
650 def block_prefetch(self, locator):
651 """Initiate a background download of a block.
653 This assumes that the underlying KeepClient implements a block cache,
654 so repeated requests for the same block will not result in repeated
655 downloads (unless the block is evicted from the cache.) This method
660 if not self.prefetch_enabled:
663 if self._keep.get_from_cache(locator) is not None:
667 if locator in self._bufferblocks:
670 self.start_get_threads()
671 self._prefetch_queue.put(locator)
674 class ArvadosFile(object):
675 """Represent a file in a Collection.
677 ArvadosFile manages the underlying representation of a file in Keep as a
678 sequence of segments spanning a set of blocks, and implements random
681 This object may be accessed from multiple threads.
685 def __init__(self, parent, name, stream=[], segments=[]):
687 ArvadosFile constructor.
690 a list of Range objects representing a block stream
693 a list of Range objects representing segments
697 self._committed = False
699 self.lock = parent.root_collection().lock
701 self._add_segment(stream, s.locator, s.range_size)
702 self._current_bblock = None
705 return self.parent.writable()
709 return copy.copy(self._segments)
712 def clone(self, new_parent, new_name):
713 """Make a copy of this file."""
714 cp = ArvadosFile(new_parent, new_name)
715 cp.replace_contents(self)
720 def replace_contents(self, other):
721 """Replace segments of this file with segments from another `ArvadosFile` object."""
725 for other_segment in other.segments():
726 new_loc = other_segment.locator
727 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
728 if other_segment.locator not in map_loc:
729 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
730 if bufferblock.state() != _BufferBlock.WRITABLE:
731 map_loc[other_segment.locator] = bufferblock.locator()
733 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
734 new_loc = map_loc[other_segment.locator]
736 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
738 self._committed = False
740 def __eq__(self, other):
743 if not isinstance(other, ArvadosFile):
746 othersegs = other.segments()
748 if len(self._segments) != len(othersegs):
750 for i in xrange(0, len(othersegs)):
751 seg1 = self._segments[i]
756 if self.parent._my_block_manager().is_bufferblock(loc1):
757 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
759 if other.parent._my_block_manager().is_bufferblock(loc2):
760 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
762 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
763 seg1.range_start != seg2.range_start or
764 seg1.range_size != seg2.range_size or
765 seg1.segment_offset != seg2.segment_offset):
770 def __ne__(self, other):
771 return not self.__eq__(other)
774 def set_committed(self):
775 """Set committed flag to False"""
776 self._committed = True
780 """Get whether this is committed or not."""
781 return self._committed
785 def truncate(self, size):
786 """Shrink the size of the file.
788 If `size` is less than the size of the file, the file contents after
789 `size` will be discarded. If `size` is greater than the current size
790 of the file, an IOError will be raised.
793 if size < self.size():
795 for r in self._segments:
796 range_end = r.range_start+r.range_size
797 if r.range_start >= size:
798 # segment is past the trucate size, all done
800 elif size < range_end:
801 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
802 nr.segment_offset = r.segment_offset
808 self._segments = new_segs
809 self._committed = False
810 elif size > self.size():
811 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
813 def readfrom(self, offset, size, num_retries, exact=False):
814 """Read up to `size` bytes from the file starting at `offset`.
817 If False (default), return less data than requested if the read
818 crosses a block boundary and the next block isn't cached. If True,
819 only return less data than requested when hitting EOF.
823 if size == 0 or offset >= self.size():
825 readsegs = locators_and_ranges(self._segments, offset, size)
826 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
831 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
833 blockview = memoryview(block)
834 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
840 if lr.locator not in locs:
841 self.parent._my_block_manager().block_prefetch(lr.locator)
846 def _repack_writes(self, num_retries):
847 """Test if the buffer block has more data than actual segments.
849 This happens when a buffered write over-writes a file range written in
850 a previous buffered write. Re-pack the buffer block for efficiency
851 and to avoid leaking information.
854 segs = self._segments
856 # Sum up the segments to get the total bytes of the file referencing
857 # into the buffer block.
858 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
859 write_total = sum([s.range_size for s in bufferblock_segs])
861 if write_total < self._current_bblock.size():
862 # There is more data in the buffer block than is actually accounted for by segments, so
863 # re-pack into a new buffer by copying over to a new buffer block.
864 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
865 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
866 for t in bufferblock_segs:
867 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
868 t.segment_offset = new_bb.size() - t.range_size
870 self._current_bblock = new_bb
874 def writeto(self, offset, data, num_retries):
875 """Write `data` to the file starting at `offset`.
877 This will update existing bytes and/or extend the size of the file as
884 if offset > self.size():
885 raise ArgumentError("Offset is past the end of the file")
887 if len(data) > config.KEEP_BLOCK_SIZE:
888 # Chunk it up into smaller writes
890 dataview = memoryview(data)
892 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
893 n += config.KEEP_BLOCK_SIZE
896 self._committed = False
898 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
899 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
901 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
902 self._repack_writes(num_retries)
903 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
904 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
905 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
907 self._current_bblock.append(data)
909 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
911 self.parent.notify(WRITE, self.parent, self.name, (self, self))
916 def flush(self, sync=True, num_retries=0):
917 """Flush the current bufferblock to Keep.
920 If True, commit block synchronously, wait until buffer block has been written.
921 If False, commit block asynchronously, return immediately after putting block into
927 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
928 if self._current_bblock.state() == _BufferBlock.WRITABLE:
929 self._repack_writes(num_retries)
930 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
934 for s in self._segments:
935 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
937 if bb.state() != _BufferBlock.COMMITTED:
938 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
939 to_delete.add(s.locator)
940 s.locator = bb.locator()
942 self.parent._my_block_manager().delete_bufferblock(s)
944 self.parent.notify(MOD, self.parent, self.name, (self, self))
948 def add_segment(self, blocks, pos, size):
949 """Add a segment to the end of the file.
951 `pos` and `offset` reference a section of the stream described by
952 `blocks` (a list of Range objects)
955 self._add_segment(blocks, pos, size)
957 def _add_segment(self, blocks, pos, size):
958 """Internal implementation of add_segment."""
959 self._committed = False
960 for lr in locators_and_ranges(blocks, pos, size):
961 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
962 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
963 self._segments.append(r)
967 """Get the file size."""
969 n = self._segments[-1]
970 return n.range_start + n.range_size
975 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
978 for segment in self.segments:
979 loc = segment.locator
980 if loc.startswith("bufferblock"):
981 loc = self._bufferblocks[loc].calculate_locator()
982 if portable_locators:
983 loc = KeepLocator(loc).stripped()
984 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
985 segment.segment_offset, segment.range_size))
986 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
992 def _reparent(self, newparent, newname):
993 self._committed = False
994 self.flush(sync=True)
995 self.parent.remove(self.name)
996 self.parent = newparent
998 self.lock = self.parent.root_collection().lock
1001 class ArvadosFileReader(ArvadosFileReaderBase):
1002 """Wraps ArvadosFile in a file-like object supporting reading only.
1004 Be aware that this class is NOT thread safe as there is no locking around
1005 updating file pointer.
1009 def __init__(self, arvadosfile, num_retries=None):
1010 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1011 self.arvadosfile = arvadosfile
1014 return self.arvadosfile.size()
1016 def stream_name(self):
1017 return self.arvadosfile.parent.stream_name()
1019 @_FileLikeObjectBase._before_close
1021 def read(self, size=None, num_retries=None):
1022 """Read up to `size` bytes from the file and return the result.
1024 Starts at the current file position. If `size` is None, read the
1025 entire remainder of the file.
1029 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1032 self._filepos += len(rd)
1033 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1034 return ''.join(data)
1036 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1037 self._filepos += len(data)
1040 @_FileLikeObjectBase._before_close
1042 def readfrom(self, offset, size, num_retries=None):
1043 """Read up to `size` bytes from the stream, starting at the specified file offset.
1045 This method does not change the file position.
1047 return self.arvadosfile.readfrom(offset, size, num_retries)
1053 class ArvadosFileWriter(ArvadosFileReader):
1054 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1056 Be aware that this class is NOT thread safe as there is no locking around
1057 updating file pointer.
1061 def __init__(self, arvadosfile, mode, num_retries=None):
1062 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1065 @_FileLikeObjectBase._before_close
1067 def write(self, data, num_retries=None):
1068 if self.mode[0] == "a":
1069 self.arvadosfile.writeto(self.size(), data, num_retries)
1071 self.arvadosfile.writeto(self._filepos, data, num_retries)
1072 self._filepos += len(data)
1075 @_FileLikeObjectBase._before_close
1077 def writelines(self, seq, num_retries=None):
1079 self.write(s, num_retries=num_retries)
1081 @_FileLikeObjectBase._before_close
1082 def truncate(self, size=None):
1084 size = self._filepos
1085 self.arvadosfile.truncate(size)
1086 if self._filepos > self.size():
1087 self._filepos = self.size()
1089 @_FileLikeObjectBase._before_close
1091 self.arvadosfile.flush()
1096 super(ArvadosFileWriter, self).close()