14 from .errors import KeepWriteError, AssertionError, ArgumentError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
23 _logger = logging.getLogger('arvados.arvfile')
26 """split(path) -> streamname, filename
28 Separate the stream name and file name in a /-separated stream path and
29 return a tuple (stream_name, file_name). If no stream name is available,
34 stream_name, file_name = path.rsplit('/', 1)
35 except ValueError: # No / in string
36 stream_name, file_name = '.', path
37 return stream_name, file_name
39 class _FileLikeObjectBase(object):
40 def __init__(self, name, mode):
46 def _before_close(orig_func):
47 @functools.wraps(orig_func)
48 def before_close_wrapper(self, *args, **kwargs):
50 raise ValueError("I/O operation on closed stream file")
51 return orig_func(self, *args, **kwargs)
52 return before_close_wrapper
57 def __exit__(self, exc_type, exc_value, traceback):
68 class ArvadosFileReaderBase(_FileLikeObjectBase):
69 def __init__(self, name, mode, num_retries=None):
70 super(ArvadosFileReaderBase, self).__init__(name, mode)
72 self.num_retries = num_retries
73 self._readline_cache = (None, None)
77 data = self.readline()
82 def decompressed_name(self):
83 return re.sub('\.(bz2|gz)$', '', self.name)
85 @_FileLikeObjectBase._before_close
86 def seek(self, pos, whence=os.SEEK_SET):
87 if whence == os.SEEK_CUR:
89 elif whence == os.SEEK_END:
91 self._filepos = min(max(pos, 0L), self.size())
96 @_FileLikeObjectBase._before_close
98 def readall(self, size=2**20, num_retries=None):
100 data = self.read(size, num_retries=num_retries)
105 @_FileLikeObjectBase._before_close
107 def readline(self, size=float('inf'), num_retries=None):
108 cache_pos, cache_data = self._readline_cache
109 if self.tell() == cache_pos:
113 data_size = len(data[-1])
114 while (data_size < size) and ('\n' not in data[-1]):
115 next_read = self.read(2 ** 20, num_retries=num_retries)
118 data.append(next_read)
119 data_size += len(next_read)
122 nextline_index = data.index('\n') + 1
124 nextline_index = len(data)
125 nextline_index = min(nextline_index, size)
126 self._readline_cache = (self.tell(), data[nextline_index:])
127 return data[:nextline_index]
129 @_FileLikeObjectBase._before_close
131 def decompress(self, decompress, size, num_retries=None):
132 for segment in self.readall(size, num_retries):
133 data = decompress(segment)
137 @_FileLikeObjectBase._before_close
139 def readall_decompressed(self, size=2**20, num_retries=None):
141 if self.name.endswith('.bz2'):
142 dc = bz2.BZ2Decompressor()
143 return self.decompress(dc.decompress, size,
144 num_retries=num_retries)
145 elif self.name.endswith('.gz'):
146 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
147 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
148 size, num_retries=num_retries)
150 return self.readall(size, num_retries=num_retries)
152 @_FileLikeObjectBase._before_close
154 def readlines(self, sizehint=float('inf'), num_retries=None):
157 for s in self.readall(num_retries=num_retries):
160 if data_size >= sizehint:
162 return ''.join(data).splitlines(True)
165 raise NotImplementedError()
167 def read(self, size, num_retries=None):
168 raise NotImplementedError()
170 def readfrom(self, start, size, num_retries=None):
171 raise NotImplementedError()
174 class StreamFileReader(ArvadosFileReaderBase):
175 class _NameAttribute(str):
176 # The Python file API provides a plain .name attribute.
177 # Older SDK provided a name() method.
178 # This class provides both, for maximum compatibility.
182 def __init__(self, stream, segments, name):
183 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
184 self._stream = stream
185 self.segments = segments
187 def stream_name(self):
188 return self._stream.name()
191 n = self.segments[-1]
192 return n.range_start + n.range_size
194 @_FileLikeObjectBase._before_close
196 def read(self, size, num_retries=None):
197 """Read up to 'size' bytes from the stream, starting at the current file position"""
202 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
204 lr = available_chunks[0]
205 data = self._stream.readfrom(lr.locator+lr.segment_offset,
207 num_retries=num_retries)
209 self._filepos += len(data)
212 @_FileLikeObjectBase._before_close
214 def readfrom(self, start, size, num_retries=None):
215 """Read up to 'size' bytes from the stream, starting at 'start'"""
220 for lr in locators_and_ranges(self.segments, start, size):
221 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
222 num_retries=num_retries))
225 def as_manifest(self):
227 for r in self.segments:
228 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
229 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
232 def synchronized(orig_func):
233 @functools.wraps(orig_func)
234 def synchronized_wrapper(self, *args, **kwargs):
236 return orig_func(self, *args, **kwargs)
237 return synchronized_wrapper
240 class StateChangeError(Exception):
241 def __init__(self, message, state, nextstate):
242 super(StateChangeError, self).__init__(message)
244 self.nextstate = nextstate
246 class _BufferBlock(object):
247 """A stand-in for a Keep block that is in the process of being written.
249 Writers can append to it, get the size, and compute the Keep locator.
250 There are three valid states:
256 Block is in the process of being uploaded to Keep, append is an error.
259 The block has been written to Keep, its internal buffer has been
260 released, fetching the block will fetch it via keep client (since we
261 discarded the internal copy), and identifiers referring to the BufferBlock
262 can be replaced with the block locator.
271 def __init__(self, blockid, starting_capacity, owner):
274 the identifier for this block
277 the initial buffer capacity
280 ArvadosFile that owns this block
283 self.blockid = blockid
284 self.buffer_block = bytearray(starting_capacity)
285 self.buffer_view = memoryview(self.buffer_block)
286 self.write_pointer = 0
287 self._state = _BufferBlock.WRITABLE
290 self.lock = threading.Lock()
291 self.wait_for_commit = threading.Event()
295 def append(self, data):
296 """Append some data to the buffer.
298 Only valid if the block is in WRITABLE state. Implements an expanding
299 buffer, doubling capacity as needed to accomdate all the data.
302 if self._state == _BufferBlock.WRITABLE:
303 while (self.write_pointer+len(data)) > len(self.buffer_block):
304 new_buffer_block = bytearray(len(self.buffer_block) * 2)
305 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
306 self.buffer_block = new_buffer_block
307 self.buffer_view = memoryview(self.buffer_block)
308 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
309 self.write_pointer += len(data)
312 raise AssertionError("Buffer block is not writable")
315 def set_state(self, nextstate, val=None):
316 if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
317 (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED) or
318 (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.ERROR) or
319 (self._state == _BufferBlock.ERROR and nextstate == _BufferBlock.PENDING)):
320 self._state = nextstate
322 if self._state == _BufferBlock.PENDING:
323 self.wait_for_commit.clear()
325 if self._state == _BufferBlock.COMMITTED:
327 self.buffer_view = None
328 self.buffer_block = None
329 self.wait_for_commit.set()
331 if self._state == _BufferBlock.ERROR:
333 self.wait_for_commit.set()
335 raise StateChangeError("Invalid state change from %s to %s" % (self.state, nextstate), self.state, nextstate)
342 """The amount of data written to the buffer."""
343 return self.write_pointer
347 """The Keep locator for this buffer's contents."""
348 if self._locator is None:
349 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
353 def clone(self, new_blockid, owner):
354 if self._state == _BufferBlock.COMMITTED:
355 raise AssertionError("Cannot duplicate committed buffer block")
356 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
357 bufferblock.append(self.buffer_view[0:self.size()])
363 self.buffer_block = None
364 self.buffer_view = None
367 class NoopLock(object):
371 def __exit__(self, exc_type, exc_value, traceback):
374 def acquire(self, blocking=False):
381 def must_be_writable(orig_func):
382 @functools.wraps(orig_func)
383 def must_be_writable_wrapper(self, *args, **kwargs):
384 if not self.writable():
385 raise IOError(errno.EROFS, "Collection is read-only.")
386 return orig_func(self, *args, **kwargs)
387 return must_be_writable_wrapper
390 class _BlockManager(object):
391 """BlockManager handles buffer blocks.
393 Also handles background block uploads, and background block prefetch for a
394 Collection of ArvadosFiles.
398 DEFAULT_PUT_THREADS = 2
399 DEFAULT_GET_THREADS = 2
401 def __init__(self, keep):
402 """keep: KeepClient object to use"""
404 self._bufferblocks = {}
405 self._put_queue = None
406 self._put_threads = None
407 self._prefetch_queue = None
408 self._prefetch_threads = None
409 self.lock = threading.Lock()
410 self.prefetch_enabled = True
411 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
412 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
415 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
416 """Allocate a new, empty bufferblock in WRITABLE state and return it.
419 optional block identifier, otherwise one will be automatically assigned
422 optional capacity, otherwise will use default capacity
425 ArvadosFile that owns this block
429 blockid = "bufferblock%i" % len(self._bufferblocks)
430 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
431 self._bufferblocks[bufferblock.blockid] = bufferblock
435 def dup_block(self, block, owner):
436 """Create a new bufferblock initialized with the content of an existing bufferblock.
439 the buffer block to copy.
442 ArvadosFile that owns the new block
445 new_blockid = "bufferblock%i" % len(self._bufferblocks)
446 bufferblock = block.clone(new_blockid, owner)
447 self._bufferblocks[bufferblock.blockid] = bufferblock
451 def is_bufferblock(self, locator):
452 return locator in self._bufferblocks
454 def _commit_bufferblock_worker(self):
455 """Background uploader thread."""
459 bufferblock = self._put_queue.get()
460 if bufferblock is None:
463 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
464 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
466 except Exception as e:
467 bufferblock.set_state(_BufferBlock.ERROR, e)
469 if self._put_queue is not None:
470 self._put_queue.task_done()
473 def start_put_threads(self):
474 if self._put_threads is None:
475 # Start uploader threads.
477 # If we don't limit the Queue size, the upload queue can quickly
478 # grow to take up gigabytes of RAM if the writing process is
479 # generating data more quickly than it can be send to the Keep
482 # With two upload threads and a queue size of 2, this means up to 4
483 # blocks pending. If they are full 64 MiB blocks, that means up to
484 # 256 MiB of internal buffering, which is the same size as the
485 # default download block cache in KeepClient.
486 self._put_queue = Queue.Queue(maxsize=2)
488 self._put_threads = []
489 for i in xrange(0, self.num_put_threads):
490 thread = threading.Thread(target=self._commit_bufferblock_worker)
491 self._put_threads.append(thread)
492 thread.daemon = False
495 def _block_prefetch_worker(self):
496 """The background downloader thread."""
499 b = self._prefetch_queue.get()
507 def start_get_threads(self):
508 if self._prefetch_threads is None:
509 self._prefetch_queue = Queue.Queue()
510 self._prefetch_threads = []
511 for i in xrange(0, self.num_get_threads):
512 thread = threading.Thread(target=self._block_prefetch_worker)
513 self._prefetch_threads.append(thread)
519 def stop_threads(self):
520 """Shut down and wait for background upload and download threads to finish."""
522 if self._put_threads is not None:
523 for t in self._put_threads:
524 self._put_queue.put(None)
525 for t in self._put_threads:
527 self._put_threads = None
528 self._put_queue = None
530 if self._prefetch_threads is not None:
531 for t in self._prefetch_threads:
532 self._prefetch_queue.put(None)
533 for t in self._prefetch_threads:
535 self._prefetch_threads = None
536 self._prefetch_queue = None
541 def __exit__(self, exc_type, exc_value, traceback):
547 def commit_bufferblock(self, block, sync):
548 """Initiate a background upload of a bufferblock.
551 The block object to upload
554 If `sync` is True, upload the block synchronously.
555 If `sync` is False, upload the block asynchronously. This will
556 return immediately unless the upload queue is at capacity, in
557 which case it will wait on an upload queue slot.
562 # Mark the block as PENDING so to disallow any more appends.
563 block.set_state(_BufferBlock.PENDING)
564 except StateChangeError as e:
565 if e.state == _BufferBlock.PENDING and sync:
566 block.wait_for_commit.wait()
567 if block.state() == _BufferBlock.ERROR:
573 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
574 block.set_state(_BufferBlock.COMMITTED, loc)
575 except Exception as e:
576 block.set_state(_BufferBlock.ERROR, e)
579 self.start_put_threads()
580 self._put_queue.put(block)
583 def get_bufferblock(self, locator):
584 return self._bufferblocks.get(locator)
587 def delete_bufferblock(self, locator):
588 bb = self._bufferblocks[locator]
590 del self._bufferblocks[locator]
592 def get_block_contents(self, locator, num_retries, cache_only=False):
595 First checks to see if the locator is a BufferBlock and return that, if
596 not, passes the request through to KeepClient.get().
600 if locator in self._bufferblocks:
601 bufferblock = self._bufferblocks[locator]
602 if bufferblock.state() != _BufferBlock.COMMITTED:
603 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
605 locator = bufferblock._locator
607 return self._keep.get_from_cache(locator)
609 return self._keep.get(locator, num_retries=num_retries)
611 def commit_all(self):
612 """Commit all outstanding buffer blocks.
614 This is a synchronous call, and will not return until all buffer blocks
615 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
619 items = self._bufferblocks.items()
622 if v.state() != _BufferBlock.COMMITTED:
623 v.owner.flush(sync=False)
626 if self._put_queue is not None:
627 self._put_queue.join()
631 if v.state() == _BufferBlock.ERROR:
632 err.append((v.locator(), v.error))
634 raise KeepWriteError("Error writing some blocks", err, label="block")
637 # flush again with sync=True to remove committed bufferblocks from
640 v.owner.flush(sync=True)
643 def block_prefetch(self, locator):
644 """Initiate a background download of a block.
646 This assumes that the underlying KeepClient implements a block cache,
647 so repeated requests for the same block will not result in repeated
648 downloads (unless the block is evicted from the cache.) This method
653 if not self.prefetch_enabled:
657 if locator in self._bufferblocks:
659 self.start_get_threads()
660 self._prefetch_queue.put(locator)
663 class ArvadosFile(object):
664 """Represent a file in a Collection.
666 ArvadosFile manages the underlying representation of a file in Keep as a
667 sequence of segments spanning a set of blocks, and implements random
670 This object may be accessed from multiple threads.
674 def __init__(self, parent, name, stream=[], segments=[]):
676 ArvadosFile constructor.
679 a list of Range objects representing a block stream
682 a list of Range objects representing segments
686 self._committed = False
688 self.lock = parent.root_collection().lock
690 self._add_segment(stream, s.locator, s.range_size)
691 self._current_bblock = None
694 return self.parent.writable()
698 return copy.copy(self._segments)
701 def clone(self, new_parent, new_name):
702 """Make a copy of this file."""
703 cp = ArvadosFile(new_parent, new_name)
704 cp.replace_contents(self)
709 def replace_contents(self, other):
710 """Replace segments of this file with segments from another `ArvadosFile` object."""
714 for other_segment in other.segments():
715 new_loc = other_segment.locator
716 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
717 if other_segment.locator not in map_loc:
718 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
719 if bufferblock.state() != _BufferBlock.WRITABLE:
720 map_loc[other_segment.locator] = bufferblock.locator()
722 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
723 new_loc = map_loc[other_segment.locator]
725 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
727 self._committed = False
729 def __eq__(self, other):
732 if not isinstance(other, ArvadosFile):
735 othersegs = other.segments()
737 if len(self._segments) != len(othersegs):
739 for i in xrange(0, len(othersegs)):
740 seg1 = self._segments[i]
745 if self.parent._my_block_manager().is_bufferblock(loc1):
746 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
748 if other.parent._my_block_manager().is_bufferblock(loc2):
749 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
751 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
752 seg1.range_start != seg2.range_start or
753 seg1.range_size != seg2.range_size or
754 seg1.segment_offset != seg2.segment_offset):
759 def __ne__(self, other):
760 return not self.__eq__(other)
763 def set_committed(self):
764 """Set committed flag to False"""
765 self._committed = True
769 """Get whether this is committed or not."""
770 return self._committed
774 def truncate(self, size):
775 """Shrink the size of the file.
777 If `size` is less than the size of the file, the file contents after
778 `size` will be discarded. If `size` is greater than the current size
779 of the file, an IOError will be raised.
782 if size < self.size():
784 for r in self._segments:
785 range_end = r.range_start+r.range_size
786 if r.range_start >= size:
787 # segment is past the trucate size, all done
789 elif size < range_end:
790 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
791 nr.segment_offset = r.segment_offset
797 self._segments = new_segs
798 self._committed = False
799 elif size > self.size():
800 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
802 def readfrom(self, offset, size, num_retries, exact=False):
803 """Read up to `size` bytes from the file starting at `offset`.
806 If False (default), return less data than requested if the read
807 crosses a block boundary and the next block isn't cached. If True,
808 only return less data than requested when hitting EOF.
812 if size == 0 or offset >= self.size():
814 prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
815 readsegs = locators_and_ranges(self._segments, offset, size)
818 self.parent._my_block_manager().block_prefetch(lr.locator)
822 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
824 blockview = memoryview(block)
825 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
830 def _repack_writes(self, num_retries):
831 """Test if the buffer block has more data than actual segments.
833 This happens when a buffered write over-writes a file range written in
834 a previous buffered write. Re-pack the buffer block for efficiency
835 and to avoid leaking information.
838 segs = self._segments
840 # Sum up the segments to get the total bytes of the file referencing
841 # into the buffer block.
842 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
843 write_total = sum([s.range_size for s in bufferblock_segs])
845 if write_total < self._current_bblock.size():
846 # There is more data in the buffer block than is actually accounted for by segments, so
847 # re-pack into a new buffer by copying over to a new buffer block.
848 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
849 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
850 for t in bufferblock_segs:
851 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
852 t.segment_offset = new_bb.size() - t.range_size
854 self._current_bblock = new_bb
858 def writeto(self, offset, data, num_retries):
859 """Write `data` to the file starting at `offset`.
861 This will update existing bytes and/or extend the size of the file as
868 if offset > self.size():
869 raise ArgumentError("Offset is past the end of the file")
871 if len(data) > config.KEEP_BLOCK_SIZE:
872 # Chunk it up into smaller writes
874 dataview = memoryview(data)
876 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
877 n += config.KEEP_BLOCK_SIZE
880 self._committed = False
882 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
883 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
885 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
886 self._repack_writes(num_retries)
887 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
888 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
889 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
891 self._current_bblock.append(data)
893 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
895 self.parent.notify(WRITE, self.parent, self.name, (self, self))
900 def flush(self, sync=True, num_retries=0):
901 """Flush the current bufferblock to Keep.
904 If True, commit block synchronously, wait until buffer block has been written.
905 If False, commit block asynchronously, return immediately after putting block into
911 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
912 if self._current_bblock.state() == _BufferBlock.WRITABLE:
913 self._repack_writes(num_retries)
914 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
918 for s in self._segments:
919 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
921 if bb.state() != _BufferBlock.COMMITTED:
922 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=True)
923 to_delete.add(s.locator)
924 s.locator = bb.locator()
926 self.parent._my_block_manager().delete_bufferblock(s)
928 self.parent.notify(MOD, self.parent, self.name, (self, self))
932 def add_segment(self, blocks, pos, size):
933 """Add a segment to the end of the file.
935 `pos` and `offset` reference a section of the stream described by
936 `blocks` (a list of Range objects)
939 self._add_segment(blocks, pos, size)
941 def _add_segment(self, blocks, pos, size):
942 """Internal implementation of add_segment."""
943 self._committed = False
944 for lr in locators_and_ranges(blocks, pos, size):
945 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
946 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
947 self._segments.append(r)
951 """Get the file size."""
953 n = self._segments[-1]
954 return n.range_start + n.range_size
959 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
962 for segment in self.segments:
963 loc = segment.locator
964 if loc.startswith("bufferblock"):
965 loc = self._bufferblocks[loc].calculate_locator()
966 if portable_locators:
967 loc = KeepLocator(loc).stripped()
968 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
969 segment.segment_offset, segment.range_size))
970 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
976 def _reparent(self, newparent, newname):
977 self._committed = False
978 self.flush(sync=True)
979 self.parent.remove(self.name)
980 self.parent = newparent
982 self.lock = self.parent.root_collection().lock
985 class ArvadosFileReader(ArvadosFileReaderBase):
986 """Wraps ArvadosFile in a file-like object supporting reading only.
988 Be aware that this class is NOT thread safe as there is no locking around
989 updating file pointer.
993 def __init__(self, arvadosfile, num_retries=None):
994 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
995 self.arvadosfile = arvadosfile
998 return self.arvadosfile.size()
1000 def stream_name(self):
1001 return self.arvadosfile.parent.stream_name()
1003 @_FileLikeObjectBase._before_close
1005 def read(self, size=None, num_retries=None):
1006 """Read up to `size` bytes from the file and return the result.
1008 Starts at the current file position. If `size` is None, read the
1009 entire remainder of the file.
1013 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1016 self._filepos += len(rd)
1017 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1018 return ''.join(data)
1020 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1021 self._filepos += len(data)
1024 @_FileLikeObjectBase._before_close
1026 def readfrom(self, offset, size, num_retries=None):
1027 """Read up to `size` bytes from the stream, starting at the specified file offset.
1029 This method does not change the file position.
1031 return self.arvadosfile.readfrom(offset, size, num_retries)
1037 class ArvadosFileWriter(ArvadosFileReader):
1038 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1040 Be aware that this class is NOT thread safe as there is no locking around
1041 updating file pointer.
1045 def __init__(self, arvadosfile, mode, num_retries=None):
1046 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1049 @_FileLikeObjectBase._before_close
1051 def write(self, data, num_retries=None):
1052 if self.mode[0] == "a":
1053 self.arvadosfile.writeto(self.size(), data, num_retries)
1055 self.arvadosfile.writeto(self._filepos, data, num_retries)
1056 self._filepos += len(data)
1059 @_FileLikeObjectBase._before_close
1061 def writelines(self, seq, num_retries=None):
1063 self.write(s, num_retries)
1065 @_FileLikeObjectBase._before_close
1066 def truncate(self, size=None):
1068 size = self._filepos
1069 self.arvadosfile.truncate(size)
1070 if self._filepos > self.size():
1071 self._filepos = self.size()
1073 @_FileLikeObjectBase._before_close
1075 self.arvadosfile.flush()
1080 super(ArvadosFileWriter, self).close()