1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import range
6 from builtins import object
22 from .errors import KeepWriteError, AssertionError, ArgumentError
23 from .keep import KeepLocator
24 from ._normalize_stream import normalize_stream
25 from ._ranges import locators_and_ranges, replace_range, Range
26 from .retry import retry_method
31 _logger = logging.getLogger('arvados.arvfile')
34 """split(path) -> streamname, filename
36 Separate the stream name and file name in a /-separated stream path and
37 return a tuple (stream_name, file_name). If no stream name is available,
42 stream_name, file_name = path.rsplit('/', 1)
43 except ValueError: # No / in string
44 stream_name, file_name = '.', path
45 return stream_name, file_name
48 class UnownedBlockError(Exception):
49 """Raised when there's an writable block without an owner on the BlockManager."""
53 class _FileLikeObjectBase(object):
54 def __init__(self, name, mode):
60 def _before_close(orig_func):
61 @functools.wraps(orig_func)
62 def before_close_wrapper(self, *args, **kwargs):
64 raise ValueError("I/O operation on closed stream file")
65 return orig_func(self, *args, **kwargs)
66 return before_close_wrapper
71 def __exit__(self, exc_type, exc_value, traceback):
82 class ArvadosFileReaderBase(_FileLikeObjectBase):
83 def __init__(self, name, mode, num_retries=None):
84 super(ArvadosFileReaderBase, self).__init__(name, mode)
86 self.num_retries = num_retries
87 self._readline_cache = (None, None)
91 data = self.readline()
96 def decompressed_name(self):
97 return re.sub('\.(bz2|gz)$', '', self.name)
99 @_FileLikeObjectBase._before_close
100 def seek(self, pos, whence=os.SEEK_SET):
101 if whence == os.SEEK_CUR:
103 elif whence == os.SEEK_END:
105 self._filepos = min(max(pos, 0), self.size())
110 @_FileLikeObjectBase._before_close
112 def readall(self, size=2**20, num_retries=None):
114 data = self.read(size, num_retries=num_retries)
119 @_FileLikeObjectBase._before_close
121 def readline(self, size=float('inf'), num_retries=None):
122 cache_pos, cache_data = self._readline_cache
123 if self.tell() == cache_pos:
125 self._filepos += len(cache_data)
128 data_size = len(data[-1])
129 while (data_size < size) and (b'\n' not in data[-1]):
130 next_read = self.read(2 ** 20, num_retries=num_retries)
133 data.append(next_read)
134 data_size += len(next_read)
135 data = b''.join(data)
137 nextline_index = data.index(b'\n') + 1
139 nextline_index = len(data)
140 nextline_index = min(nextline_index, size)
141 self._filepos -= len(data) - nextline_index
142 self._readline_cache = (self.tell(), data[nextline_index:])
143 return data[:nextline_index].decode()
145 @_FileLikeObjectBase._before_close
147 def decompress(self, decompress, size, num_retries=None):
148 for segment in self.readall(size, num_retries=num_retries):
149 data = decompress(segment)
153 @_FileLikeObjectBase._before_close
155 def readall_decompressed(self, size=2**20, num_retries=None):
157 if self.name.endswith('.bz2'):
158 dc = bz2.BZ2Decompressor()
159 return self.decompress(dc.decompress, size,
160 num_retries=num_retries)
161 elif self.name.endswith('.gz'):
162 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
163 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
164 size, num_retries=num_retries)
166 return self.readall(size, num_retries=num_retries)
168 @_FileLikeObjectBase._before_close
170 def readlines(self, sizehint=float('inf'), num_retries=None):
173 for s in self.readall(num_retries=num_retries):
176 if data_size >= sizehint:
178 return b''.join(data).decode().splitlines(True)
181 raise NotImplementedError()
183 def read(self, size, num_retries=None):
184 raise NotImplementedError()
186 def readfrom(self, start, size, num_retries=None):
187 raise NotImplementedError()
190 class StreamFileReader(ArvadosFileReaderBase):
191 class _NameAttribute(str):
192 # The Python file API provides a plain .name attribute.
193 # Older SDK provided a name() method.
194 # This class provides both, for maximum compatibility.
198 def __init__(self, stream, segments, name):
199 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
200 self._stream = stream
201 self.segments = segments
203 def stream_name(self):
204 return self._stream.name()
207 n = self.segments[-1]
208 return n.range_start + n.range_size
210 @_FileLikeObjectBase._before_close
212 def read(self, size, num_retries=None):
213 """Read up to 'size' bytes from the stream, starting at the current file position"""
218 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
220 lr = available_chunks[0]
221 data = self._stream.readfrom(lr.locator+lr.segment_offset,
223 num_retries=num_retries)
225 self._filepos += len(data)
228 @_FileLikeObjectBase._before_close
230 def readfrom(self, start, size, num_retries=None):
231 """Read up to 'size' bytes from the stream, starting at 'start'"""
236 for lr in locators_and_ranges(self.segments, start, size):
237 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
238 num_retries=num_retries))
239 return b''.join(data)
241 def as_manifest(self):
243 for r in self.segments:
244 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
245 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
248 def synchronized(orig_func):
249 @functools.wraps(orig_func)
250 def synchronized_wrapper(self, *args, **kwargs):
252 return orig_func(self, *args, **kwargs)
253 return synchronized_wrapper
256 class StateChangeError(Exception):
257 def __init__(self, message, state, nextstate):
258 super(StateChangeError, self).__init__(message)
260 self.nextstate = nextstate
262 class _BufferBlock(object):
263 """A stand-in for a Keep block that is in the process of being written.
265 Writers can append to it, get the size, and compute the Keep locator.
266 There are three valid states:
272 Block is in the process of being uploaded to Keep, append is an error.
275 The block has been written to Keep, its internal buffer has been
276 released, fetching the block will fetch it via keep client (since we
277 discarded the internal copy), and identifiers referring to the BufferBlock
278 can be replaced with the block locator.
287 def __init__(self, blockid, starting_capacity, owner):
290 the identifier for this block
293 the initial buffer capacity
296 ArvadosFile that owns this block
299 self.blockid = blockid
300 self.buffer_block = bytearray(starting_capacity)
301 self.buffer_view = memoryview(self.buffer_block)
302 self.write_pointer = 0
303 self._state = _BufferBlock.WRITABLE
306 self.lock = threading.Lock()
307 self.wait_for_commit = threading.Event()
311 def append(self, data):
312 """Append some data to the buffer.
314 Only valid if the block is in WRITABLE state. Implements an expanding
315 buffer, doubling capacity as needed to accomdate all the data.
318 if self._state == _BufferBlock.WRITABLE:
319 if not isinstance(data, bytes) and not isinstance(data, memoryview):
321 while (self.write_pointer+len(data)) > len(self.buffer_block):
322 new_buffer_block = bytearray(len(self.buffer_block) * 2)
323 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
324 self.buffer_block = new_buffer_block
325 self.buffer_view = memoryview(self.buffer_block)
326 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
327 self.write_pointer += len(data)
330 raise AssertionError("Buffer block is not writable")
332 STATE_TRANSITIONS = frozenset([
334 (PENDING, COMMITTED),
339 def set_state(self, nextstate, val=None):
340 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
341 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
342 self._state = nextstate
344 if self._state == _BufferBlock.PENDING:
345 self.wait_for_commit.clear()
347 if self._state == _BufferBlock.COMMITTED:
349 self.buffer_view = None
350 self.buffer_block = None
351 self.wait_for_commit.set()
353 if self._state == _BufferBlock.ERROR:
355 self.wait_for_commit.set()
362 """The amount of data written to the buffer."""
363 return self.write_pointer
367 """The Keep locator for this buffer's contents."""
368 if self._locator is None:
369 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
373 def clone(self, new_blockid, owner):
374 if self._state == _BufferBlock.COMMITTED:
375 raise AssertionError("Cannot duplicate committed buffer block")
376 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
377 bufferblock.append(self.buffer_view[0:self.size()])
383 self.buffer_block = None
384 self.buffer_view = None
387 class NoopLock(object):
391 def __exit__(self, exc_type, exc_value, traceback):
394 def acquire(self, blocking=False):
401 def must_be_writable(orig_func):
402 @functools.wraps(orig_func)
403 def must_be_writable_wrapper(self, *args, **kwargs):
404 if not self.writable():
405 raise IOError(errno.EROFS, "Collection is read-only.")
406 return orig_func(self, *args, **kwargs)
407 return must_be_writable_wrapper
410 class _BlockManager(object):
411 """BlockManager handles buffer blocks.
413 Also handles background block uploads, and background block prefetch for a
414 Collection of ArvadosFiles.
418 DEFAULT_PUT_THREADS = 2
419 DEFAULT_GET_THREADS = 2
421 def __init__(self, keep, copies=None, put_threads=None):
422 """keep: KeepClient object to use"""
424 self._bufferblocks = collections.OrderedDict()
425 self._put_queue = None
426 self._put_threads = None
427 self._prefetch_queue = None
428 self._prefetch_threads = None
429 self.lock = threading.Lock()
430 self.prefetch_enabled = True
432 self.num_put_threads = put_threads
434 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
435 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
437 self._pending_write_size = 0
438 self.threads_lock = threading.Lock()
441 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
442 """Allocate a new, empty bufferblock in WRITABLE state and return it.
445 optional block identifier, otherwise one will be automatically assigned
448 optional capacity, otherwise will use default capacity
451 ArvadosFile that owns this block
454 return self._alloc_bufferblock(blockid, starting_capacity, owner)
456 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
458 blockid = "%s" % uuid.uuid4()
459 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
460 self._bufferblocks[bufferblock.blockid] = bufferblock
464 def dup_block(self, block, owner):
465 """Create a new bufferblock initialized with the content of an existing bufferblock.
468 the buffer block to copy.
471 ArvadosFile that owns the new block
474 new_blockid = "bufferblock%i" % len(self._bufferblocks)
475 bufferblock = block.clone(new_blockid, owner)
476 self._bufferblocks[bufferblock.blockid] = bufferblock
480 def is_bufferblock(self, locator):
481 return locator in self._bufferblocks
483 def _commit_bufferblock_worker(self):
484 """Background uploader thread."""
488 bufferblock = self._put_queue.get()
489 if bufferblock is None:
492 if self.copies is None:
493 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
495 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
496 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
498 except Exception as e:
499 bufferblock.set_state(_BufferBlock.ERROR, e)
501 if self._put_queue is not None:
502 self._put_queue.task_done()
504 def start_put_threads(self):
505 with self.threads_lock:
506 if self._put_threads is None:
507 # Start uploader threads.
509 # If we don't limit the Queue size, the upload queue can quickly
510 # grow to take up gigabytes of RAM if the writing process is
511 # generating data more quickly than it can be send to the Keep
514 # With two upload threads and a queue size of 2, this means up to 4
515 # blocks pending. If they are full 64 MiB blocks, that means up to
516 # 256 MiB of internal buffering, which is the same size as the
517 # default download block cache in KeepClient.
518 self._put_queue = queue.Queue(maxsize=2)
520 self._put_threads = []
521 for i in range(0, self.num_put_threads):
522 thread = threading.Thread(target=self._commit_bufferblock_worker)
523 self._put_threads.append(thread)
527 def _block_prefetch_worker(self):
528 """The background downloader thread."""
531 b = self._prefetch_queue.get()
536 _logger.exception("Exception doing block prefetch")
539 def start_get_threads(self):
540 if self._prefetch_threads is None:
541 self._prefetch_queue = queue.Queue()
542 self._prefetch_threads = []
543 for i in range(0, self.num_get_threads):
544 thread = threading.Thread(target=self._block_prefetch_worker)
545 self._prefetch_threads.append(thread)
551 def stop_threads(self):
552 """Shut down and wait for background upload and download threads to finish."""
554 if self._put_threads is not None:
555 for t in self._put_threads:
556 self._put_queue.put(None)
557 for t in self._put_threads:
559 self._put_threads = None
560 self._put_queue = None
562 if self._prefetch_threads is not None:
563 for t in self._prefetch_threads:
564 self._prefetch_queue.put(None)
565 for t in self._prefetch_threads:
567 self._prefetch_threads = None
568 self._prefetch_queue = None
573 def __exit__(self, exc_type, exc_value, traceback):
577 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
578 """Packs small blocks together before uploading"""
579 self._pending_write_size += closed_file_size
581 # Check if there are enough small blocks for filling up one in full
582 if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
584 # Search blocks ready for getting packed together before being committed to Keep.
585 # A WRITABLE block always has an owner.
586 # A WRITABLE block with its owner.closed() implies that it's
587 # size is <= KEEP_BLOCK_SIZE/2.
589 small_blocks = [b for b in list(self._bufferblocks.values()) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
590 except AttributeError:
591 # Writable blocks without owner shouldn't exist.
592 raise UnownedBlockError()
594 if len(small_blocks) <= 1:
595 # Not enough small blocks for repacking
598 # Update the pending write size count with its true value, just in case
599 # some small file was opened, written and closed several times.
600 self._pending_write_size = sum([b.size() for b in small_blocks])
601 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
604 new_bb = self._alloc_bufferblock()
605 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
606 bb = small_blocks.pop(0)
608 self._pending_write_size -= bb.size()
609 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
610 arvfile.set_segments([Range(new_bb.blockid,
613 new_bb.write_pointer - bb.size())])
614 self._delete_bufferblock(bb.blockid)
615 self.commit_bufferblock(new_bb, sync=sync)
617 def commit_bufferblock(self, block, sync):
618 """Initiate a background upload of a bufferblock.
621 The block object to upload
624 If `sync` is True, upload the block synchronously.
625 If `sync` is False, upload the block asynchronously. This will
626 return immediately unless the upload queue is at capacity, in
627 which case it will wait on an upload queue slot.
631 # Mark the block as PENDING so to disallow any more appends.
632 block.set_state(_BufferBlock.PENDING)
633 except StateChangeError as e:
634 if e.state == _BufferBlock.PENDING:
636 block.wait_for_commit.wait()
639 if block.state() == _BufferBlock.COMMITTED:
641 elif block.state() == _BufferBlock.ERROR:
648 if self.copies is None:
649 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
651 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
652 block.set_state(_BufferBlock.COMMITTED, loc)
653 except Exception as e:
654 block.set_state(_BufferBlock.ERROR, e)
657 self.start_put_threads()
658 self._put_queue.put(block)
661 def get_bufferblock(self, locator):
662 return self._bufferblocks.get(locator)
665 def delete_bufferblock(self, locator):
666 self._delete_bufferblock(locator)
668 def _delete_bufferblock(self, locator):
669 bb = self._bufferblocks[locator]
671 del self._bufferblocks[locator]
673 def get_block_contents(self, locator, num_retries, cache_only=False):
676 First checks to see if the locator is a BufferBlock and return that, if
677 not, passes the request through to KeepClient.get().
681 if locator in self._bufferblocks:
682 bufferblock = self._bufferblocks[locator]
683 if bufferblock.state() != _BufferBlock.COMMITTED:
684 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
686 locator = bufferblock._locator
688 return self._keep.get_from_cache(locator)
690 return self._keep.get(locator, num_retries=num_retries)
692 def commit_all(self):
693 """Commit all outstanding buffer blocks.
695 This is a synchronous call, and will not return until all buffer blocks
696 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
699 self.repack_small_blocks(force=True, sync=True)
702 items = list(self._bufferblocks.items())
705 if v.state() != _BufferBlock.COMMITTED and v.owner:
706 v.owner.flush(sync=False)
709 if self._put_queue is not None:
710 self._put_queue.join()
714 if v.state() == _BufferBlock.ERROR:
715 err.append((v.locator(), v.error))
717 raise KeepWriteError("Error writing some blocks", err, label="block")
720 # flush again with sync=True to remove committed bufferblocks from
723 v.owner.flush(sync=True)
725 def block_prefetch(self, locator):
726 """Initiate a background download of a block.
728 This assumes that the underlying KeepClient implements a block cache,
729 so repeated requests for the same block will not result in repeated
730 downloads (unless the block is evicted from the cache.) This method
735 if not self.prefetch_enabled:
738 if self._keep.get_from_cache(locator) is not None:
742 if locator in self._bufferblocks:
745 self.start_get_threads()
746 self._prefetch_queue.put(locator)
749 class ArvadosFile(object):
750 """Represent a file in a Collection.
752 ArvadosFile manages the underlying representation of a file in Keep as a
753 sequence of segments spanning a set of blocks, and implements random
756 This object may be accessed from multiple threads.
760 def __init__(self, parent, name, stream=[], segments=[]):
762 ArvadosFile constructor.
765 a list of Range objects representing a block stream
768 a list of Range objects representing segments
772 self._writers = set()
773 self._committed = False
775 self.lock = parent.root_collection().lock
777 self._add_segment(stream, s.locator, s.range_size)
778 self._current_bblock = None
781 return self.parent.writable()
784 def permission_expired(self, as_of_dt=None):
785 """Returns True if any of the segment's locators is expired"""
786 for r in self._segments:
787 if KeepLocator(r.locator).permission_expired(as_of_dt):
793 return copy.copy(self._segments)
796 def clone(self, new_parent, new_name):
797 """Make a copy of this file."""
798 cp = ArvadosFile(new_parent, new_name)
799 cp.replace_contents(self)
804 def replace_contents(self, other):
805 """Replace segments of this file with segments from another `ArvadosFile` object."""
809 for other_segment in other.segments():
810 new_loc = other_segment.locator
811 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
812 if other_segment.locator not in map_loc:
813 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
814 if bufferblock.state() != _BufferBlock.WRITABLE:
815 map_loc[other_segment.locator] = bufferblock.locator()
817 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
818 new_loc = map_loc[other_segment.locator]
820 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
822 self.set_committed(False)
824 def __eq__(self, other):
827 if not isinstance(other, ArvadosFile):
830 othersegs = other.segments()
832 if len(self._segments) != len(othersegs):
834 for i in range(0, len(othersegs)):
835 seg1 = self._segments[i]
840 if self.parent._my_block_manager().is_bufferblock(loc1):
841 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
843 if other.parent._my_block_manager().is_bufferblock(loc2):
844 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
846 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
847 seg1.range_start != seg2.range_start or
848 seg1.range_size != seg2.range_size or
849 seg1.segment_offset != seg2.segment_offset):
854 def __ne__(self, other):
855 return not self.__eq__(other)
858 def set_segments(self, segs):
859 self._segments = segs
862 def set_committed(self, value=True):
863 """Set committed flag.
865 If value is True, set committed to be True.
867 If value is False, set committed to be False for this and all parents.
869 if value == self._committed:
871 self._committed = value
872 if self._committed is False and self.parent is not None:
873 self.parent.set_committed(False)
877 """Get whether this is committed or not."""
878 return self._committed
881 def add_writer(self, writer):
882 """Add an ArvadosFileWriter reference to the list of writers"""
883 if isinstance(writer, ArvadosFileWriter):
884 self._writers.add(writer)
887 def remove_writer(self, writer, flush):
889 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
890 and do some block maintenance tasks.
892 self._writers.remove(writer)
894 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
895 # File writer closed, not small enough for repacking
898 # All writers closed and size is adequate for repacking
899 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
903 Get whether this is closed or not. When the writers list is empty, the file
904 is supposed to be closed.
906 return len(self._writers) == 0
910 def truncate(self, size):
911 """Shrink the size of the file.
913 If `size` is less than the size of the file, the file contents after
914 `size` will be discarded. If `size` is greater than the current size
915 of the file, an IOError will be raised.
918 if size < self.size():
920 for r in self._segments:
921 range_end = r.range_start+r.range_size
922 if r.range_start >= size:
923 # segment is past the trucate size, all done
925 elif size < range_end:
926 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
927 nr.segment_offset = r.segment_offset
933 self._segments = new_segs
934 self.set_committed(False)
935 elif size > self.size():
936 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
938 def readfrom(self, offset, size, num_retries, exact=False):
939 """Read up to `size` bytes from the file starting at `offset`.
942 If False (default), return less data than requested if the read
943 crosses a block boundary and the next block isn't cached. If True,
944 only return less data than requested when hitting EOF.
948 if size == 0 or offset >= self.size():
950 readsegs = locators_and_ranges(self._segments, offset, size)
951 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
956 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
958 blockview = memoryview(block)
959 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
965 if lr.locator not in locs:
966 self.parent._my_block_manager().block_prefetch(lr.locator)
969 return b''.join(data)
971 def _repack_writes(self, num_retries):
972 """Test if the buffer block has more data than actual segments.
974 This happens when a buffered write over-writes a file range written in
975 a previous buffered write. Re-pack the buffer block for efficiency
976 and to avoid leaking information.
979 segs = self._segments
981 # Sum up the segments to get the total bytes of the file referencing
982 # into the buffer block.
983 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
984 write_total = sum([s.range_size for s in bufferblock_segs])
986 if write_total < self._current_bblock.size():
987 # There is more data in the buffer block than is actually accounted for by segments, so
988 # re-pack into a new buffer by copying over to a new buffer block.
989 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
990 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
991 for t in bufferblock_segs:
992 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
993 t.segment_offset = new_bb.size() - t.range_size
995 self._current_bblock = new_bb
999 def writeto(self, offset, data, num_retries):
1000 """Write `data` to the file starting at `offset`.
1002 This will update existing bytes and/or extend the size of the file as
1006 if not isinstance(data, bytes) and not isinstance(data, memoryview):
1007 data = data.encode()
1011 if offset > self.size():
1012 raise ArgumentError("Offset is past the end of the file")
1014 if len(data) > config.KEEP_BLOCK_SIZE:
1015 # Chunk it up into smaller writes
1017 dataview = memoryview(data)
1018 while n < len(data):
1019 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1020 n += config.KEEP_BLOCK_SIZE
1023 self.set_committed(False)
1025 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1026 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1028 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1029 self._repack_writes(num_retries)
1030 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1031 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1032 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1034 self._current_bblock.append(data)
1036 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1038 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1043 def flush(self, sync=True, num_retries=0):
1044 """Flush the current bufferblock to Keep.
1047 If True, commit block synchronously, wait until buffer block has been written.
1048 If False, commit block asynchronously, return immediately after putting block into
1051 if self.committed():
1054 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1055 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1056 self._repack_writes(num_retries)
1057 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1061 for s in self._segments:
1062 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1064 if bb.state() != _BufferBlock.COMMITTED:
1065 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1066 to_delete.add(s.locator)
1067 s.locator = bb.locator()
1069 self.parent._my_block_manager().delete_bufferblock(s)
1071 self.parent.notify(MOD, self.parent, self.name, (self, self))
1075 def add_segment(self, blocks, pos, size):
1076 """Add a segment to the end of the file.
1078 `pos` and `offset` reference a section of the stream described by
1079 `blocks` (a list of Range objects)
1082 self._add_segment(blocks, pos, size)
1084 def _add_segment(self, blocks, pos, size):
1085 """Internal implementation of add_segment."""
1086 self.set_committed(False)
1087 for lr in locators_and_ranges(blocks, pos, size):
1088 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1089 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1090 self._segments.append(r)
1094 """Get the file size."""
1096 n = self._segments[-1]
1097 return n.range_start + n.range_size
1102 def manifest_text(self, stream_name=".", portable_locators=False,
1103 normalize=False, only_committed=False):
1106 for segment in self.segments:
1107 loc = segment.locator
1108 if self.parent._my_block_manager().is_bufferblock(loc):
1111 loc = self._bufferblocks[loc].calculate_locator()
1112 if portable_locators:
1113 loc = KeepLocator(loc).stripped()
1114 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1115 segment.segment_offset, segment.range_size))
1116 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1122 def _reparent(self, newparent, newname):
1123 self.set_committed(False)
1124 self.flush(sync=True)
1125 self.parent.remove(self.name)
1126 self.parent = newparent
1128 self.lock = self.parent.root_collection().lock
1131 class ArvadosFileReader(ArvadosFileReaderBase):
1132 """Wraps ArvadosFile in a file-like object supporting reading only.
1134 Be aware that this class is NOT thread safe as there is no locking around
1135 updating file pointer.
1139 def __init__(self, arvadosfile, num_retries=None):
1140 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1141 self.arvadosfile = arvadosfile
1144 return self.arvadosfile.size()
1146 def stream_name(self):
1147 return self.arvadosfile.parent.stream_name()
1149 @_FileLikeObjectBase._before_close
1151 def read(self, size=None, num_retries=None):
1152 """Read up to `size` bytes from the file and return the result.
1154 Starts at the current file position. If `size` is None, read the
1155 entire remainder of the file.
1159 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1162 self._filepos += len(rd)
1163 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1164 return b''.join(data)
1166 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1167 self._filepos += len(data)
1170 @_FileLikeObjectBase._before_close
1172 def readfrom(self, offset, size, num_retries=None):
1173 """Read up to `size` bytes from the stream, starting at the specified file offset.
1175 This method does not change the file position.
1177 return self.arvadosfile.readfrom(offset, size, num_retries)
1183 class ArvadosFileWriter(ArvadosFileReader):
1184 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1186 Be aware that this class is NOT thread safe as there is no locking around
1187 updating file pointer.
1191 def __init__(self, arvadosfile, mode, num_retries=None):
1192 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1194 self.arvadosfile.add_writer(self)
1196 @_FileLikeObjectBase._before_close
1198 def write(self, data, num_retries=None):
1199 if self.mode[0] == "a":
1200 self.arvadosfile.writeto(self.size(), data, num_retries)
1202 self.arvadosfile.writeto(self._filepos, data, num_retries)
1203 self._filepos += len(data)
1206 @_FileLikeObjectBase._before_close
1208 def writelines(self, seq, num_retries=None):
1210 self.write(s, num_retries=num_retries)
1212 @_FileLikeObjectBase._before_close
1213 def truncate(self, size=None):
1215 size = self._filepos
1216 self.arvadosfile.truncate(size)
1217 if self._filepos > self.size():
1218 self._filepos = self.size()
1220 @_FileLikeObjectBase._before_close
1222 self.arvadosfile.flush()
1224 def close(self, flush=True):
1226 self.arvadosfile.remove_writer(self, flush)
1227 super(ArvadosFileWriter, self).close()