14 from .errors import KeepWriteError, AssertionError, ArgumentError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
23 _logger = logging.getLogger('arvados.arvfile')
26 """split(path) -> streamname, filename
28 Separate the stream name and file name in a /-separated stream path and
29 return a tuple (stream_name, file_name). If no stream name is available,
34 stream_name, file_name = path.rsplit('/', 1)
35 except ValueError: # No / in string
36 stream_name, file_name = '.', path
37 return stream_name, file_name
39 class _FileLikeObjectBase(object):
40 def __init__(self, name, mode):
46 def _before_close(orig_func):
47 @functools.wraps(orig_func)
48 def before_close_wrapper(self, *args, **kwargs):
50 raise ValueError("I/O operation on closed stream file")
51 return orig_func(self, *args, **kwargs)
52 return before_close_wrapper
57 def __exit__(self, exc_type, exc_value, traceback):
68 class ArvadosFileReaderBase(_FileLikeObjectBase):
69 def __init__(self, name, mode, num_retries=None):
70 super(ArvadosFileReaderBase, self).__init__(name, mode)
72 self.num_retries = num_retries
73 self._readline_cache = (None, None)
77 data = self.readline()
82 def decompressed_name(self):
83 return re.sub('\.(bz2|gz)$', '', self.name)
85 @_FileLikeObjectBase._before_close
86 def seek(self, pos, whence=os.SEEK_SET):
87 if whence == os.SEEK_CUR:
89 elif whence == os.SEEK_END:
91 self._filepos = min(max(pos, 0L), self.size())
96 @_FileLikeObjectBase._before_close
98 def readall(self, size=2**20, num_retries=None):
100 data = self.read(size, num_retries=num_retries)
105 @_FileLikeObjectBase._before_close
107 def readline(self, size=float('inf'), num_retries=None):
108 cache_pos, cache_data = self._readline_cache
109 if self.tell() == cache_pos:
111 self._filepos += len(cache_data)
114 data_size = len(data[-1])
115 while (data_size < size) and ('\n' not in data[-1]):
116 next_read = self.read(2 ** 20, num_retries=num_retries)
119 data.append(next_read)
120 data_size += len(next_read)
123 nextline_index = data.index('\n') + 1
125 nextline_index = len(data)
126 nextline_index = min(nextline_index, size)
127 self._filepos -= len(data) - nextline_index
128 self._readline_cache = (self.tell(), data[nextline_index:])
129 return data[:nextline_index]
131 @_FileLikeObjectBase._before_close
133 def decompress(self, decompress, size, num_retries=None):
134 for segment in self.readall(size, num_retries=num_retries):
135 data = decompress(segment)
139 @_FileLikeObjectBase._before_close
141 def readall_decompressed(self, size=2**20, num_retries=None):
143 if self.name.endswith('.bz2'):
144 dc = bz2.BZ2Decompressor()
145 return self.decompress(dc.decompress, size,
146 num_retries=num_retries)
147 elif self.name.endswith('.gz'):
148 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
149 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
150 size, num_retries=num_retries)
152 return self.readall(size, num_retries=num_retries)
154 @_FileLikeObjectBase._before_close
156 def readlines(self, sizehint=float('inf'), num_retries=None):
159 for s in self.readall(num_retries=num_retries):
162 if data_size >= sizehint:
164 return ''.join(data).splitlines(True)
167 raise NotImplementedError()
169 def read(self, size, num_retries=None):
170 raise NotImplementedError()
172 def readfrom(self, start, size, num_retries=None):
173 raise NotImplementedError()
176 class StreamFileReader(ArvadosFileReaderBase):
177 class _NameAttribute(str):
178 # The Python file API provides a plain .name attribute.
179 # Older SDK provided a name() method.
180 # This class provides both, for maximum compatibility.
184 def __init__(self, stream, segments, name):
185 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
186 self._stream = stream
187 self.segments = segments
189 def stream_name(self):
190 return self._stream.name()
193 n = self.segments[-1]
194 return n.range_start + n.range_size
196 @_FileLikeObjectBase._before_close
198 def read(self, size, num_retries=None):
199 """Read up to 'size' bytes from the stream, starting at the current file position"""
204 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
206 lr = available_chunks[0]
207 data = self._stream.readfrom(lr.locator+lr.segment_offset,
209 num_retries=num_retries)
211 self._filepos += len(data)
214 @_FileLikeObjectBase._before_close
216 def readfrom(self, start, size, num_retries=None):
217 """Read up to 'size' bytes from the stream, starting at 'start'"""
222 for lr in locators_and_ranges(self.segments, start, size):
223 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
224 num_retries=num_retries))
227 def as_manifest(self):
229 for r in self.segments:
230 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
231 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
234 def synchronized(orig_func):
235 @functools.wraps(orig_func)
236 def synchronized_wrapper(self, *args, **kwargs):
238 return orig_func(self, *args, **kwargs)
239 return synchronized_wrapper
242 class StateChangeError(Exception):
243 def __init__(self, message, state, nextstate):
244 super(StateChangeError, self).__init__(message)
246 self.nextstate = nextstate
248 class _BufferBlock(object):
249 """A stand-in for a Keep block that is in the process of being written.
251 Writers can append to it, get the size, and compute the Keep locator.
252 There are three valid states:
258 Block is in the process of being uploaded to Keep, append is an error.
261 The block has been written to Keep, its internal buffer has been
262 released, fetching the block will fetch it via keep client (since we
263 discarded the internal copy), and identifiers referring to the BufferBlock
264 can be replaced with the block locator.
273 def __init__(self, blockid, starting_capacity, owner):
276 the identifier for this block
279 the initial buffer capacity
282 ArvadosFile that owns this block
285 self.blockid = blockid
286 self.buffer_block = bytearray(starting_capacity)
287 self.buffer_view = memoryview(self.buffer_block)
288 self.write_pointer = 0
289 self._state = _BufferBlock.WRITABLE
292 self.lock = threading.Lock()
293 self.wait_for_commit = threading.Event()
297 def append(self, data):
298 """Append some data to the buffer.
300 Only valid if the block is in WRITABLE state. Implements an expanding
301 buffer, doubling capacity as needed to accomdate all the data.
304 if self._state == _BufferBlock.WRITABLE:
305 while (self.write_pointer+len(data)) > len(self.buffer_block):
306 new_buffer_block = bytearray(len(self.buffer_block) * 2)
307 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
308 self.buffer_block = new_buffer_block
309 self.buffer_view = memoryview(self.buffer_block)
310 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
311 self.write_pointer += len(data)
314 raise AssertionError("Buffer block is not writable")
316 STATE_TRANSITIONS = frozenset([
318 (PENDING, COMMITTED),
323 def set_state(self, nextstate, val=None):
324 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
325 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
326 self._state = nextstate
328 if self._state == _BufferBlock.PENDING:
329 self.wait_for_commit.clear()
331 if self._state == _BufferBlock.COMMITTED:
333 self.buffer_view = None
334 self.buffer_block = None
335 self.wait_for_commit.set()
337 if self._state == _BufferBlock.ERROR:
339 self.wait_for_commit.set()
346 """The amount of data written to the buffer."""
347 return self.write_pointer
351 """The Keep locator for this buffer's contents."""
352 if self._locator is None:
353 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
357 def clone(self, new_blockid, owner):
358 if self._state == _BufferBlock.COMMITTED:
359 raise AssertionError("Cannot duplicate committed buffer block")
360 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
361 bufferblock.append(self.buffer_view[0:self.size()])
367 self.buffer_block = None
368 self.buffer_view = None
371 class NoopLock(object):
375 def __exit__(self, exc_type, exc_value, traceback):
378 def acquire(self, blocking=False):
385 def must_be_writable(orig_func):
386 @functools.wraps(orig_func)
387 def must_be_writable_wrapper(self, *args, **kwargs):
388 if not self.writable():
389 raise IOError(errno.EROFS, "Collection is read-only.")
390 return orig_func(self, *args, **kwargs)
391 return must_be_writable_wrapper
394 class _BlockManager(object):
395 """BlockManager handles buffer blocks.
397 Also handles background block uploads, and background block prefetch for a
398 Collection of ArvadosFiles.
402 DEFAULT_PUT_THREADS = 2
403 DEFAULT_GET_THREADS = 2
405 def __init__(self, keep, copies=None):
406 """keep: KeepClient object to use"""
408 self._bufferblocks = {}
409 self._put_queue = None
410 self._put_threads = None
411 self._prefetch_queue = None
412 self._prefetch_threads = None
413 self.lock = threading.Lock()
414 self.prefetch_enabled = True
415 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
416 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
420 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
421 """Allocate a new, empty bufferblock in WRITABLE state and return it.
424 optional block identifier, otherwise one will be automatically assigned
427 optional capacity, otherwise will use default capacity
430 ArvadosFile that owns this block
434 blockid = "bufferblock%i" % len(self._bufferblocks)
435 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
436 self._bufferblocks[bufferblock.blockid] = bufferblock
440 def dup_block(self, block, owner):
441 """Create a new bufferblock initialized with the content of an existing bufferblock.
444 the buffer block to copy.
447 ArvadosFile that owns the new block
450 new_blockid = "bufferblock%i" % len(self._bufferblocks)
451 bufferblock = block.clone(new_blockid, owner)
452 self._bufferblocks[bufferblock.blockid] = bufferblock
456 def is_bufferblock(self, locator):
457 return locator in self._bufferblocks
459 def _commit_bufferblock_worker(self):
460 """Background uploader thread."""
464 bufferblock = self._put_queue.get()
465 if bufferblock is None:
468 if self.copies is None:
469 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
471 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
472 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
474 except Exception as e:
475 bufferblock.set_state(_BufferBlock.ERROR, e)
477 if self._put_queue is not None:
478 self._put_queue.task_done()
481 def start_put_threads(self):
482 if self._put_threads is None:
483 # Start uploader threads.
485 # If we don't limit the Queue size, the upload queue can quickly
486 # grow to take up gigabytes of RAM if the writing process is
487 # generating data more quickly than it can be send to the Keep
490 # With two upload threads and a queue size of 2, this means up to 4
491 # blocks pending. If they are full 64 MiB blocks, that means up to
492 # 256 MiB of internal buffering, which is the same size as the
493 # default download block cache in KeepClient.
494 self._put_queue = Queue.Queue(maxsize=2)
496 self._put_threads = []
497 for i in xrange(0, self.num_put_threads):
498 thread = threading.Thread(target=self._commit_bufferblock_worker)
499 self._put_threads.append(thread)
503 def _block_prefetch_worker(self):
504 """The background downloader thread."""
507 b = self._prefetch_queue.get()
515 def start_get_threads(self):
516 if self._prefetch_threads is None:
517 self._prefetch_queue = Queue.Queue()
518 self._prefetch_threads = []
519 for i in xrange(0, self.num_get_threads):
520 thread = threading.Thread(target=self._block_prefetch_worker)
521 self._prefetch_threads.append(thread)
527 def stop_threads(self):
528 """Shut down and wait for background upload and download threads to finish."""
530 if self._put_threads is not None:
531 for t in self._put_threads:
532 self._put_queue.put(None)
533 for t in self._put_threads:
535 self._put_threads = None
536 self._put_queue = None
538 if self._prefetch_threads is not None:
539 for t in self._prefetch_threads:
540 self._prefetch_queue.put(None)
541 for t in self._prefetch_threads:
543 self._prefetch_threads = None
544 self._prefetch_queue = None
549 def __exit__(self, exc_type, exc_value, traceback):
552 def commit_bufferblock(self, block, sync):
553 """Initiate a background upload of a bufferblock.
556 The block object to upload
559 If `sync` is True, upload the block synchronously.
560 If `sync` is False, upload the block asynchronously. This will
561 return immediately unless the upload queue is at capacity, in
562 which case it will wait on an upload queue slot.
567 # Mark the block as PENDING so to disallow any more appends.
568 block.set_state(_BufferBlock.PENDING)
569 except StateChangeError as e:
570 if e.state == _BufferBlock.PENDING:
572 block.wait_for_commit.wait()
575 if block.state() == _BufferBlock.COMMITTED:
577 elif block.state() == _BufferBlock.ERROR:
584 if self.copies is None:
585 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
587 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
588 block.set_state(_BufferBlock.COMMITTED, loc)
589 except Exception as e:
590 block.set_state(_BufferBlock.ERROR, e)
593 self.start_put_threads()
594 self._put_queue.put(block)
597 def get_bufferblock(self, locator):
598 return self._bufferblocks.get(locator)
601 def delete_bufferblock(self, locator):
602 bb = self._bufferblocks[locator]
604 del self._bufferblocks[locator]
606 def get_block_contents(self, locator, num_retries, cache_only=False):
609 First checks to see if the locator is a BufferBlock and return that, if
610 not, passes the request through to KeepClient.get().
614 if locator in self._bufferblocks:
615 bufferblock = self._bufferblocks[locator]
616 if bufferblock.state() != _BufferBlock.COMMITTED:
617 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
619 locator = bufferblock._locator
621 return self._keep.get_from_cache(locator)
623 return self._keep.get(locator, num_retries=num_retries)
625 def commit_all(self):
626 """Commit all outstanding buffer blocks.
628 This is a synchronous call, and will not return until all buffer blocks
629 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
633 items = self._bufferblocks.items()
636 if v.state() != _BufferBlock.COMMITTED:
637 v.owner.flush(sync=False)
640 if self._put_queue is not None:
641 self._put_queue.join()
645 if v.state() == _BufferBlock.ERROR:
646 err.append((v.locator(), v.error))
648 raise KeepWriteError("Error writing some blocks", err, label="block")
651 # flush again with sync=True to remove committed bufferblocks from
654 v.owner.flush(sync=True)
656 def block_prefetch(self, locator):
657 """Initiate a background download of a block.
659 This assumes that the underlying KeepClient implements a block cache,
660 so repeated requests for the same block will not result in repeated
661 downloads (unless the block is evicted from the cache.) This method
666 if not self.prefetch_enabled:
669 if self._keep.get_from_cache(locator) is not None:
673 if locator in self._bufferblocks:
676 self.start_get_threads()
677 self._prefetch_queue.put(locator)
680 class ArvadosFile(object):
681 """Represent a file in a Collection.
683 ArvadosFile manages the underlying representation of a file in Keep as a
684 sequence of segments spanning a set of blocks, and implements random
687 This object may be accessed from multiple threads.
691 def __init__(self, parent, name, stream=[], segments=[]):
693 ArvadosFile constructor.
696 a list of Range objects representing a block stream
699 a list of Range objects representing segments
703 self._committed = False
705 self.lock = parent.root_collection().lock
707 self._add_segment(stream, s.locator, s.range_size)
708 self._current_bblock = None
711 return self.parent.writable()
715 return copy.copy(self._segments)
718 def clone(self, new_parent, new_name):
719 """Make a copy of this file."""
720 cp = ArvadosFile(new_parent, new_name)
721 cp.replace_contents(self)
726 def replace_contents(self, other):
727 """Replace segments of this file with segments from another `ArvadosFile` object."""
731 for other_segment in other.segments():
732 new_loc = other_segment.locator
733 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
734 if other_segment.locator not in map_loc:
735 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
736 if bufferblock.state() != _BufferBlock.WRITABLE:
737 map_loc[other_segment.locator] = bufferblock.locator()
739 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
740 new_loc = map_loc[other_segment.locator]
742 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
744 self._committed = False
746 def __eq__(self, other):
749 if not isinstance(other, ArvadosFile):
752 othersegs = other.segments()
754 if len(self._segments) != len(othersegs):
756 for i in xrange(0, len(othersegs)):
757 seg1 = self._segments[i]
762 if self.parent._my_block_manager().is_bufferblock(loc1):
763 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
765 if other.parent._my_block_manager().is_bufferblock(loc2):
766 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
768 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
769 seg1.range_start != seg2.range_start or
770 seg1.range_size != seg2.range_size or
771 seg1.segment_offset != seg2.segment_offset):
776 def __ne__(self, other):
777 return not self.__eq__(other)
780 def set_committed(self):
781 """Set committed flag to False"""
782 self._committed = True
786 """Get whether this is committed or not."""
787 return self._committed
791 def truncate(self, size):
792 """Shrink the size of the file.
794 If `size` is less than the size of the file, the file contents after
795 `size` will be discarded. If `size` is greater than the current size
796 of the file, an IOError will be raised.
799 if size < self.size():
801 for r in self._segments:
802 range_end = r.range_start+r.range_size
803 if r.range_start >= size:
804 # segment is past the trucate size, all done
806 elif size < range_end:
807 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
808 nr.segment_offset = r.segment_offset
814 self._segments = new_segs
815 self._committed = False
816 elif size > self.size():
817 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
819 def readfrom(self, offset, size, num_retries, exact=False):
820 """Read up to `size` bytes from the file starting at `offset`.
823 If False (default), return less data than requested if the read
824 crosses a block boundary and the next block isn't cached. If True,
825 only return less data than requested when hitting EOF.
829 if size == 0 or offset >= self.size():
831 readsegs = locators_and_ranges(self._segments, offset, size)
832 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
837 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
839 blockview = memoryview(block)
840 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
846 if lr.locator not in locs:
847 self.parent._my_block_manager().block_prefetch(lr.locator)
852 def _repack_writes(self, num_retries):
853 """Test if the buffer block has more data than actual segments.
855 This happens when a buffered write over-writes a file range written in
856 a previous buffered write. Re-pack the buffer block for efficiency
857 and to avoid leaking information.
860 segs = self._segments
862 # Sum up the segments to get the total bytes of the file referencing
863 # into the buffer block.
864 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
865 write_total = sum([s.range_size for s in bufferblock_segs])
867 if write_total < self._current_bblock.size():
868 # There is more data in the buffer block than is actually accounted for by segments, so
869 # re-pack into a new buffer by copying over to a new buffer block.
870 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
871 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
872 for t in bufferblock_segs:
873 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
874 t.segment_offset = new_bb.size() - t.range_size
876 self._current_bblock = new_bb
880 def writeto(self, offset, data, num_retries):
881 """Write `data` to the file starting at `offset`.
883 This will update existing bytes and/or extend the size of the file as
890 if offset > self.size():
891 raise ArgumentError("Offset is past the end of the file")
893 if len(data) > config.KEEP_BLOCK_SIZE:
894 # Chunk it up into smaller writes
896 dataview = memoryview(data)
898 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
899 n += config.KEEP_BLOCK_SIZE
902 self._committed = False
904 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
905 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
907 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
908 self._repack_writes(num_retries)
909 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
910 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
911 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
913 self._current_bblock.append(data)
915 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
917 self.parent.notify(WRITE, self.parent, self.name, (self, self))
922 def flush(self, sync=True, num_retries=0):
923 """Flush the current bufferblock to Keep.
926 If True, commit block synchronously, wait until buffer block has been written.
927 If False, commit block asynchronously, return immediately after putting block into
933 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
934 if self._current_bblock.state() == _BufferBlock.WRITABLE:
935 self._repack_writes(num_retries)
936 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
940 for s in self._segments:
941 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
943 if bb.state() != _BufferBlock.COMMITTED:
944 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
945 to_delete.add(s.locator)
946 s.locator = bb.locator()
948 self.parent._my_block_manager().delete_bufferblock(s)
950 self.parent.notify(MOD, self.parent, self.name, (self, self))
954 def add_segment(self, blocks, pos, size):
955 """Add a segment to the end of the file.
957 `pos` and `offset` reference a section of the stream described by
958 `blocks` (a list of Range objects)
961 self._add_segment(blocks, pos, size)
963 def _add_segment(self, blocks, pos, size):
964 """Internal implementation of add_segment."""
965 self._committed = False
966 for lr in locators_and_ranges(blocks, pos, size):
967 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
968 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
969 self._segments.append(r)
973 """Get the file size."""
975 n = self._segments[-1]
976 return n.range_start + n.range_size
981 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
984 for segment in self.segments:
985 loc = segment.locator
986 if loc.startswith("bufferblock"):
987 loc = self._bufferblocks[loc].calculate_locator()
988 if portable_locators:
989 loc = KeepLocator(loc).stripped()
990 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
991 segment.segment_offset, segment.range_size))
992 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
998 def _reparent(self, newparent, newname):
999 self._committed = False
1000 self.flush(sync=True)
1001 self.parent.remove(self.name)
1002 self.parent = newparent
1004 self.lock = self.parent.root_collection().lock
1007 class ArvadosFileReader(ArvadosFileReaderBase):
1008 """Wraps ArvadosFile in a file-like object supporting reading only.
1010 Be aware that this class is NOT thread safe as there is no locking around
1011 updating file pointer.
1015 def __init__(self, arvadosfile, num_retries=None):
1016 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1017 self.arvadosfile = arvadosfile
1020 return self.arvadosfile.size()
1022 def stream_name(self):
1023 return self.arvadosfile.parent.stream_name()
1025 @_FileLikeObjectBase._before_close
1027 def read(self, size=None, num_retries=None):
1028 """Read up to `size` bytes from the file and return the result.
1030 Starts at the current file position. If `size` is None, read the
1031 entire remainder of the file.
1035 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1038 self._filepos += len(rd)
1039 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1040 return ''.join(data)
1042 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1043 self._filepos += len(data)
1046 @_FileLikeObjectBase._before_close
1048 def readfrom(self, offset, size, num_retries=None):
1049 """Read up to `size` bytes from the stream, starting at the specified file offset.
1051 This method does not change the file position.
1053 return self.arvadosfile.readfrom(offset, size, num_retries)
1059 class ArvadosFileWriter(ArvadosFileReader):
1060 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1062 Be aware that this class is NOT thread safe as there is no locking around
1063 updating file pointer.
1067 def __init__(self, arvadosfile, mode, num_retries=None):
1068 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1071 @_FileLikeObjectBase._before_close
1073 def write(self, data, num_retries=None):
1074 if self.mode[0] == "a":
1075 self.arvadosfile.writeto(self.size(), data, num_retries)
1077 self.arvadosfile.writeto(self._filepos, data, num_retries)
1078 self._filepos += len(data)
1081 @_FileLikeObjectBase._before_close
1083 def writelines(self, seq, num_retries=None):
1085 self.write(s, num_retries=num_retries)
1087 @_FileLikeObjectBase._before_close
1088 def truncate(self, size=None):
1090 size = self._filepos
1091 self.arvadosfile.truncate(size)
1092 if self._filepos > self.size():
1093 self._filepos = self.size()
1095 @_FileLikeObjectBase._before_close
1097 self.arvadosfile.flush()
1102 super(ArvadosFileWriter, self).close()