14 from .errors import KeepWriteError, AssertionError, ArgumentError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
23 _logger = logging.getLogger('arvados.arvfile')
26 """split(path) -> streamname, filename
28 Separate the stream name and file name in a /-separated stream path and
29 return a tuple (stream_name, file_name). If no stream name is available,
34 stream_name, file_name = path.rsplit('/', 1)
35 except ValueError: # No / in string
36 stream_name, file_name = '.', path
37 return stream_name, file_name
39 class _FileLikeObjectBase(object):
40 def __init__(self, name, mode):
46 def _before_close(orig_func):
47 @functools.wraps(orig_func)
48 def before_close_wrapper(self, *args, **kwargs):
50 raise ValueError("I/O operation on closed stream file")
51 return orig_func(self, *args, **kwargs)
52 return before_close_wrapper
57 def __exit__(self, exc_type, exc_value, traceback):
68 class ArvadosFileReaderBase(_FileLikeObjectBase):
69 def __init__(self, name, mode, num_retries=None):
70 super(ArvadosFileReaderBase, self).__init__(name, mode)
72 self.num_retries = num_retries
73 self._readline_cache = (None, None)
77 data = self.readline()
82 def decompressed_name(self):
83 return re.sub('\.(bz2|gz)$', '', self.name)
85 @_FileLikeObjectBase._before_close
86 def seek(self, pos, whence=os.SEEK_SET):
87 if whence == os.SEEK_CUR:
89 elif whence == os.SEEK_END:
91 self._filepos = min(max(pos, 0L), self.size())
96 @_FileLikeObjectBase._before_close
98 def readall(self, size=2**20, num_retries=None):
100 data = self.read(size, num_retries=num_retries)
105 @_FileLikeObjectBase._before_close
107 def readline(self, size=float('inf'), num_retries=None):
108 cache_pos, cache_data = self._readline_cache
109 if self.tell() == cache_pos:
111 self._filepos += len(cache_data)
114 data_size = len(data[-1])
115 while (data_size < size) and ('\n' not in data[-1]):
116 next_read = self.read(2 ** 20, num_retries=num_retries)
119 data.append(next_read)
120 data_size += len(next_read)
123 nextline_index = data.index('\n') + 1
125 nextline_index = len(data)
126 nextline_index = min(nextline_index, size)
127 self._filepos -= len(data) - nextline_index
128 self._readline_cache = (self.tell(), data[nextline_index:])
129 return data[:nextline_index]
131 @_FileLikeObjectBase._before_close
133 def decompress(self, decompress, size, num_retries=None):
134 for segment in self.readall(size, num_retries=num_retries):
135 data = decompress(segment)
139 @_FileLikeObjectBase._before_close
141 def readall_decompressed(self, size=2**20, num_retries=None):
143 if self.name.endswith('.bz2'):
144 dc = bz2.BZ2Decompressor()
145 return self.decompress(dc.decompress, size,
146 num_retries=num_retries)
147 elif self.name.endswith('.gz'):
148 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
149 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
150 size, num_retries=num_retries)
152 return self.readall(size, num_retries=num_retries)
154 @_FileLikeObjectBase._before_close
156 def readlines(self, sizehint=float('inf'), num_retries=None):
159 for s in self.readall(num_retries=num_retries):
162 if data_size >= sizehint:
164 return ''.join(data).splitlines(True)
167 raise NotImplementedError()
169 def read(self, size, num_retries=None):
170 raise NotImplementedError()
172 def readfrom(self, start, size, num_retries=None):
173 raise NotImplementedError()
176 class StreamFileReader(ArvadosFileReaderBase):
177 class _NameAttribute(str):
178 # The Python file API provides a plain .name attribute.
179 # Older SDK provided a name() method.
180 # This class provides both, for maximum compatibility.
184 def __init__(self, stream, segments, name):
185 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
186 self._stream = stream
187 self.segments = segments
189 def stream_name(self):
190 return self._stream.name()
193 n = self.segments[-1]
194 return n.range_start + n.range_size
196 @_FileLikeObjectBase._before_close
198 def read(self, size, num_retries=None):
199 """Read up to 'size' bytes from the stream, starting at the current file position"""
204 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
206 lr = available_chunks[0]
207 data = self._stream.readfrom(lr.locator+lr.segment_offset,
209 num_retries=num_retries)
211 self._filepos += len(data)
214 @_FileLikeObjectBase._before_close
216 def readfrom(self, start, size, num_retries=None):
217 """Read up to 'size' bytes from the stream, starting at 'start'"""
222 for lr in locators_and_ranges(self.segments, start, size):
223 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
224 num_retries=num_retries))
227 def as_manifest(self):
229 for r in self.segments:
230 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
231 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
234 def synchronized(orig_func):
235 @functools.wraps(orig_func)
236 def synchronized_wrapper(self, *args, **kwargs):
238 return orig_func(self, *args, **kwargs)
239 return synchronized_wrapper
242 class StateChangeError(Exception):
243 def __init__(self, message, state, nextstate):
244 super(StateChangeError, self).__init__(message)
246 self.nextstate = nextstate
248 class _BufferBlock(object):
249 """A stand-in for a Keep block that is in the process of being written.
251 Writers can append to it, get the size, and compute the Keep locator.
252 There are three valid states:
258 Block is in the process of being uploaded to Keep, append is an error.
261 The block has been written to Keep, its internal buffer has been
262 released, fetching the block will fetch it via keep client (since we
263 discarded the internal copy), and identifiers referring to the BufferBlock
264 can be replaced with the block locator.
273 def __init__(self, blockid, starting_capacity, owner):
276 the identifier for this block
279 the initial buffer capacity
282 ArvadosFile that owns this block
285 self.blockid = blockid
286 self.buffer_block = bytearray(starting_capacity)
287 self.buffer_view = memoryview(self.buffer_block)
288 self.write_pointer = 0
289 self._state = _BufferBlock.WRITABLE
292 self.lock = threading.Lock()
293 self.wait_for_commit = threading.Event()
297 def append(self, data):
298 """Append some data to the buffer.
300 Only valid if the block is in WRITABLE state. Implements an expanding
301 buffer, doubling capacity as needed to accomdate all the data.
304 if self._state == _BufferBlock.WRITABLE:
305 while (self.write_pointer+len(data)) > len(self.buffer_block):
306 new_buffer_block = bytearray(len(self.buffer_block) * 2)
307 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
308 self.buffer_block = new_buffer_block
309 self.buffer_view = memoryview(self.buffer_block)
310 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
311 self.write_pointer += len(data)
314 raise AssertionError("Buffer block is not writable")
316 STATE_TRANSITIONS = frozenset([
318 (PENDING, COMMITTED),
323 def set_state(self, nextstate, val=None):
324 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
325 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
326 self._state = nextstate
328 if self._state == _BufferBlock.PENDING:
329 self.wait_for_commit.clear()
331 if self._state == _BufferBlock.COMMITTED:
333 self.buffer_view = None
334 self.buffer_block = None
335 self.wait_for_commit.set()
337 if self._state == _BufferBlock.ERROR:
339 self.wait_for_commit.set()
346 """The amount of data written to the buffer."""
347 return self.write_pointer
351 """The Keep locator for this buffer's contents."""
352 if self._locator is None:
353 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
357 def clone(self, new_blockid, owner):
358 if self._state == _BufferBlock.COMMITTED:
359 raise AssertionError("Cannot duplicate committed buffer block")
360 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
361 bufferblock.append(self.buffer_view[0:self.size()])
367 self.buffer_block = None
368 self.buffer_view = None
371 class NoopLock(object):
375 def __exit__(self, exc_type, exc_value, traceback):
378 def acquire(self, blocking=False):
385 def must_be_writable(orig_func):
386 @functools.wraps(orig_func)
387 def must_be_writable_wrapper(self, *args, **kwargs):
388 if not self.writable():
389 raise IOError(errno.EROFS, "Collection is read-only.")
390 return orig_func(self, *args, **kwargs)
391 return must_be_writable_wrapper
394 class _BlockManager(object):
395 """BlockManager handles buffer blocks.
397 Also handles background block uploads, and background block prefetch for a
398 Collection of ArvadosFiles.
402 DEFAULT_PUT_THREADS = 2
403 DEFAULT_GET_THREADS = 2
405 def __init__(self, keep):
406 """keep: KeepClient object to use"""
408 self._bufferblocks = {}
409 self._put_queue = None
410 self._put_threads = None
411 self._prefetch_queue = None
412 self._prefetch_threads = None
413 self.lock = threading.Lock()
414 self.prefetch_enabled = True
415 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
416 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
419 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
420 """Allocate a new, empty bufferblock in WRITABLE state and return it.
423 optional block identifier, otherwise one will be automatically assigned
426 optional capacity, otherwise will use default capacity
429 ArvadosFile that owns this block
433 blockid = "bufferblock%i" % len(self._bufferblocks)
434 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
435 self._bufferblocks[bufferblock.blockid] = bufferblock
439 def dup_block(self, block, owner):
440 """Create a new bufferblock initialized with the content of an existing bufferblock.
443 the buffer block to copy.
446 ArvadosFile that owns the new block
449 new_blockid = "bufferblock%i" % len(self._bufferblocks)
450 bufferblock = block.clone(new_blockid, owner)
451 self._bufferblocks[bufferblock.blockid] = bufferblock
455 def is_bufferblock(self, locator):
456 return locator in self._bufferblocks
458 def _commit_bufferblock_worker(self):
459 """Background uploader thread."""
463 bufferblock = self._put_queue.get()
464 if bufferblock is None:
467 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
468 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
470 except Exception as e:
471 bufferblock.set_state(_BufferBlock.ERROR, e)
473 if self._put_queue is not None:
474 self._put_queue.task_done()
477 def start_put_threads(self):
478 if self._put_threads is None:
479 # Start uploader threads.
481 # If we don't limit the Queue size, the upload queue can quickly
482 # grow to take up gigabytes of RAM if the writing process is
483 # generating data more quickly than it can be send to the Keep
486 # With two upload threads and a queue size of 2, this means up to 4
487 # blocks pending. If they are full 64 MiB blocks, that means up to
488 # 256 MiB of internal buffering, which is the same size as the
489 # default download block cache in KeepClient.
490 self._put_queue = Queue.Queue(maxsize=2)
492 self._put_threads = []
493 for i in xrange(0, self.num_put_threads):
494 thread = threading.Thread(target=self._commit_bufferblock_worker)
495 self._put_threads.append(thread)
499 def _block_prefetch_worker(self):
500 """The background downloader thread."""
503 b = self._prefetch_queue.get()
511 def start_get_threads(self):
512 if self._prefetch_threads is None:
513 self._prefetch_queue = Queue.Queue()
514 self._prefetch_threads = []
515 for i in xrange(0, self.num_get_threads):
516 thread = threading.Thread(target=self._block_prefetch_worker)
517 self._prefetch_threads.append(thread)
523 def stop_threads(self):
524 """Shut down and wait for background upload and download threads to finish."""
526 if self._put_threads is not None:
527 for t in self._put_threads:
528 self._put_queue.put(None)
529 for t in self._put_threads:
531 self._put_threads = None
532 self._put_queue = None
534 if self._prefetch_threads is not None:
535 for t in self._prefetch_threads:
536 self._prefetch_queue.put(None)
537 for t in self._prefetch_threads:
539 self._prefetch_threads = None
540 self._prefetch_queue = None
545 def __exit__(self, exc_type, exc_value, traceback):
548 def commit_bufferblock(self, block, sync):
549 """Initiate a background upload of a bufferblock.
552 The block object to upload
555 If `sync` is True, upload the block synchronously.
556 If `sync` is False, upload the block asynchronously. This will
557 return immediately unless the upload queue is at capacity, in
558 which case it will wait on an upload queue slot.
563 # Mark the block as PENDING so to disallow any more appends.
564 block.set_state(_BufferBlock.PENDING)
565 except StateChangeError as e:
566 if e.state == _BufferBlock.PENDING:
568 block.wait_for_commit.wait()
571 if block.state() == _BufferBlock.COMMITTED:
573 elif block.state() == _BufferBlock.ERROR:
580 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
581 block.set_state(_BufferBlock.COMMITTED, loc)
582 except Exception as e:
583 block.set_state(_BufferBlock.ERROR, e)
586 self.start_put_threads()
587 self._put_queue.put(block)
590 def get_bufferblock(self, locator):
591 return self._bufferblocks.get(locator)
594 def delete_bufferblock(self, locator):
595 bb = self._bufferblocks[locator]
597 del self._bufferblocks[locator]
599 def get_block_contents(self, locator, num_retries, cache_only=False):
602 First checks to see if the locator is a BufferBlock and return that, if
603 not, passes the request through to KeepClient.get().
607 if locator in self._bufferblocks:
608 bufferblock = self._bufferblocks[locator]
609 if bufferblock.state() != _BufferBlock.COMMITTED:
610 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
612 locator = bufferblock._locator
614 return self._keep.get_from_cache(locator)
616 return self._keep.get(locator, num_retries=num_retries)
618 def commit_all(self):
619 """Commit all outstanding buffer blocks.
621 This is a synchronous call, and will not return until all buffer blocks
622 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
626 items = self._bufferblocks.items()
629 if v.state() != _BufferBlock.COMMITTED:
630 v.owner.flush(sync=False)
633 if self._put_queue is not None:
634 self._put_queue.join()
638 if v.state() == _BufferBlock.ERROR:
639 err.append((v.locator(), v.error))
641 raise KeepWriteError("Error writing some blocks", err, label="block")
644 # flush again with sync=True to remove committed bufferblocks from
647 v.owner.flush(sync=True)
649 def block_prefetch(self, locator):
650 """Initiate a background download of a block.
652 This assumes that the underlying KeepClient implements a block cache,
653 so repeated requests for the same block will not result in repeated
654 downloads (unless the block is evicted from the cache.) This method
659 if not self.prefetch_enabled:
662 if self._keep.get_from_cache(locator) is not None:
666 if locator in self._bufferblocks:
669 self.start_get_threads()
670 self._prefetch_queue.put(locator)
673 class ArvadosFile(object):
674 """Represent a file in a Collection.
676 ArvadosFile manages the underlying representation of a file in Keep as a
677 sequence of segments spanning a set of blocks, and implements random
680 This object may be accessed from multiple threads.
684 def __init__(self, parent, name, stream=[], segments=[]):
686 ArvadosFile constructor.
689 a list of Range objects representing a block stream
692 a list of Range objects representing segments
696 self._committed = False
698 self.lock = parent.root_collection().lock
700 self._add_segment(stream, s.locator, s.range_size)
701 self._current_bblock = None
704 return self.parent.writable()
708 return copy.copy(self._segments)
711 def clone(self, new_parent, new_name):
712 """Make a copy of this file."""
713 cp = ArvadosFile(new_parent, new_name)
714 cp.replace_contents(self)
719 def replace_contents(self, other):
720 """Replace segments of this file with segments from another `ArvadosFile` object."""
724 for other_segment in other.segments():
725 new_loc = other_segment.locator
726 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
727 if other_segment.locator not in map_loc:
728 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
729 if bufferblock.state() != _BufferBlock.WRITABLE:
730 map_loc[other_segment.locator] = bufferblock.locator()
732 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
733 new_loc = map_loc[other_segment.locator]
735 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
737 self._committed = False
739 def __eq__(self, other):
742 if not isinstance(other, ArvadosFile):
745 othersegs = other.segments()
747 if len(self._segments) != len(othersegs):
749 for i in xrange(0, len(othersegs)):
750 seg1 = self._segments[i]
755 if self.parent._my_block_manager().is_bufferblock(loc1):
756 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
758 if other.parent._my_block_manager().is_bufferblock(loc2):
759 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
761 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
762 seg1.range_start != seg2.range_start or
763 seg1.range_size != seg2.range_size or
764 seg1.segment_offset != seg2.segment_offset):
769 def __ne__(self, other):
770 return not self.__eq__(other)
773 def set_committed(self):
774 """Set committed flag to False"""
775 self._committed = True
779 """Get whether this is committed or not."""
780 return self._committed
784 def truncate(self, size):
785 """Shrink the size of the file.
787 If `size` is less than the size of the file, the file contents after
788 `size` will be discarded. If `size` is greater than the current size
789 of the file, an IOError will be raised.
792 if size < self.size():
794 for r in self._segments:
795 range_end = r.range_start+r.range_size
796 if r.range_start >= size:
797 # segment is past the trucate size, all done
799 elif size < range_end:
800 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
801 nr.segment_offset = r.segment_offset
807 self._segments = new_segs
808 self._committed = False
809 elif size > self.size():
810 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
812 def readfrom(self, offset, size, num_retries, exact=False):
813 """Read up to `size` bytes from the file starting at `offset`.
816 If False (default), return less data than requested if the read
817 crosses a block boundary and the next block isn't cached. If True,
818 only return less data than requested when hitting EOF.
822 if size == 0 or offset >= self.size():
824 readsegs = locators_and_ranges(self._segments, offset, size)
825 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
830 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
832 blockview = memoryview(block)
833 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
839 if lr.locator not in locs:
840 self.parent._my_block_manager().block_prefetch(lr.locator)
845 def _repack_writes(self, num_retries):
846 """Test if the buffer block has more data than actual segments.
848 This happens when a buffered write over-writes a file range written in
849 a previous buffered write. Re-pack the buffer block for efficiency
850 and to avoid leaking information.
853 segs = self._segments
855 # Sum up the segments to get the total bytes of the file referencing
856 # into the buffer block.
857 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
858 write_total = sum([s.range_size for s in bufferblock_segs])
860 if write_total < self._current_bblock.size():
861 # There is more data in the buffer block than is actually accounted for by segments, so
862 # re-pack into a new buffer by copying over to a new buffer block.
863 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
864 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
865 for t in bufferblock_segs:
866 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
867 t.segment_offset = new_bb.size() - t.range_size
869 self._current_bblock = new_bb
873 def writeto(self, offset, data, num_retries):
874 """Write `data` to the file starting at `offset`.
876 This will update existing bytes and/or extend the size of the file as
883 if offset > self.size():
884 raise ArgumentError("Offset is past the end of the file")
886 if len(data) > config.KEEP_BLOCK_SIZE:
887 # Chunk it up into smaller writes
889 dataview = memoryview(data)
891 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
892 n += config.KEEP_BLOCK_SIZE
895 self._committed = False
897 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
898 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
900 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
901 self._repack_writes(num_retries)
902 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
903 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
904 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
906 self._current_bblock.append(data)
908 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
910 self.parent.notify(WRITE, self.parent, self.name, (self, self))
915 def flush(self, sync=True, num_retries=0):
916 """Flush the current bufferblock to Keep.
919 If True, commit block synchronously, wait until buffer block has been written.
920 If False, commit block asynchronously, return immediately after putting block into
926 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
927 if self._current_bblock.state() == _BufferBlock.WRITABLE:
928 self._repack_writes(num_retries)
929 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
933 for s in self._segments:
934 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
936 if bb.state() != _BufferBlock.COMMITTED:
937 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
938 to_delete.add(s.locator)
939 s.locator = bb.locator()
941 self.parent._my_block_manager().delete_bufferblock(s)
943 self.parent.notify(MOD, self.parent, self.name, (self, self))
947 def add_segment(self, blocks, pos, size):
948 """Add a segment to the end of the file.
950 `pos` and `offset` reference a section of the stream described by
951 `blocks` (a list of Range objects)
954 self._add_segment(blocks, pos, size)
956 def _add_segment(self, blocks, pos, size):
957 """Internal implementation of add_segment."""
958 self._committed = False
959 for lr in locators_and_ranges(blocks, pos, size):
960 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
961 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
962 self._segments.append(r)
966 """Get the file size."""
968 n = self._segments[-1]
969 return n.range_start + n.range_size
974 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
977 for segment in self.segments:
978 loc = segment.locator
979 if loc.startswith("bufferblock"):
980 loc = self._bufferblocks[loc].calculate_locator()
981 if portable_locators:
982 loc = KeepLocator(loc).stripped()
983 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
984 segment.segment_offset, segment.range_size))
985 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
991 def _reparent(self, newparent, newname):
992 self._committed = False
993 self.flush(sync=True)
994 self.parent.remove(self.name)
995 self.parent = newparent
997 self.lock = self.parent.root_collection().lock
1000 class ArvadosFileReader(ArvadosFileReaderBase):
1001 """Wraps ArvadosFile in a file-like object supporting reading only.
1003 Be aware that this class is NOT thread safe as there is no locking around
1004 updating file pointer.
1008 def __init__(self, arvadosfile, num_retries=None):
1009 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1010 self.arvadosfile = arvadosfile
1013 return self.arvadosfile.size()
1015 def stream_name(self):
1016 return self.arvadosfile.parent.stream_name()
1018 @_FileLikeObjectBase._before_close
1020 def read(self, size=None, num_retries=None):
1021 """Read up to `size` bytes from the file and return the result.
1023 Starts at the current file position. If `size` is None, read the
1024 entire remainder of the file.
1028 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1031 self._filepos += len(rd)
1032 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1033 return ''.join(data)
1035 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1036 self._filepos += len(data)
1039 @_FileLikeObjectBase._before_close
1041 def readfrom(self, offset, size, num_retries=None):
1042 """Read up to `size` bytes from the stream, starting at the specified file offset.
1044 This method does not change the file position.
1046 return self.arvadosfile.readfrom(offset, size, num_retries)
1052 class ArvadosFileWriter(ArvadosFileReader):
1053 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1055 Be aware that this class is NOT thread safe as there is no locking around
1056 updating file pointer.
1060 def __init__(self, arvadosfile, mode, num_retries=None):
1061 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1064 @_FileLikeObjectBase._before_close
1066 def write(self, data, num_retries=None):
1067 if self.mode[0] == "a":
1068 self.arvadosfile.writeto(self.size(), data, num_retries)
1070 self.arvadosfile.writeto(self._filepos, data, num_retries)
1071 self._filepos += len(data)
1074 @_FileLikeObjectBase._before_close
1076 def writelines(self, seq, num_retries=None):
1078 self.write(s, num_retries=num_retries)
1080 @_FileLikeObjectBase._before_close
1081 def truncate(self, size=None):
1083 size = self._filepos
1084 self.arvadosfile.truncate(size)
1085 if self._filepos > self.size():
1086 self._filepos = self.size()
1088 @_FileLikeObjectBase._before_close
1090 self.arvadosfile.flush()
1095 super(ArvadosFileWriter, self).close()