1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
21 from .errors import KeepWriteError, AssertionError, ArgumentError
22 from .keep import KeepLocator
23 from ._normalize_stream import normalize_stream
24 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
25 from .retry import retry_method
30 _logger = logging.getLogger('arvados.arvfile')
33 """split(path) -> streamname, filename
35 Separate the stream name and file name in a /-separated stream path and
36 return a tuple (stream_name, file_name). If no stream name is available,
41 stream_name, file_name = path.rsplit('/', 1)
42 except ValueError: # No / in string
43 stream_name, file_name = '.', path
44 return stream_name, file_name
47 class UnownedBlockError(Exception):
48 """Raised when there's an writable block without an owner on the BlockManager."""
52 class _FileLikeObjectBase(object):
53 def __init__(self, name, mode):
59 def _before_close(orig_func):
60 @functools.wraps(orig_func)
61 def before_close_wrapper(self, *args, **kwargs):
63 raise ValueError("I/O operation on closed stream file")
64 return orig_func(self, *args, **kwargs)
65 return before_close_wrapper
70 def __exit__(self, exc_type, exc_value, traceback):
81 class ArvadosFileReaderBase(_FileLikeObjectBase):
82 def __init__(self, name, mode, num_retries=None):
83 super(ArvadosFileReaderBase, self).__init__(name, mode)
85 self.num_retries = num_retries
86 self._readline_cache = (None, None)
90 data = self.readline()
95 def decompressed_name(self):
96 return re.sub(r'\.(bz2|gz)$', '', self.name)
98 @_FileLikeObjectBase._before_close
99 def seek(self, pos, whence=os.SEEK_SET):
100 if whence == os.SEEK_CUR:
102 elif whence == os.SEEK_END:
105 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
121 @_FileLikeObjectBase._before_close
123 def readall(self, size=2**20, num_retries=None):
125 data = self.read(size, num_retries=num_retries)
130 @_FileLikeObjectBase._before_close
132 def readline(self, size=float('inf'), num_retries=None):
133 cache_pos, cache_data = self._readline_cache
134 if self.tell() == cache_pos:
136 self._filepos += len(cache_data)
139 data_size = len(data[-1])
140 while (data_size < size) and (b'\n' not in data[-1]):
141 next_read = self.read(2 ** 20, num_retries=num_retries)
144 data.append(next_read)
145 data_size += len(next_read)
146 data = b''.join(data)
148 nextline_index = data.index(b'\n') + 1
150 nextline_index = len(data)
151 nextline_index = min(nextline_index, size)
152 self._filepos -= len(data) - nextline_index
153 self._readline_cache = (self.tell(), data[nextline_index:])
154 return data[:nextline_index].decode()
156 @_FileLikeObjectBase._before_close
158 def decompress(self, decompress, size, num_retries=None):
159 for segment in self.readall(size, num_retries=num_retries):
160 data = decompress(segment)
164 @_FileLikeObjectBase._before_close
166 def readall_decompressed(self, size=2**20, num_retries=None):
168 if self.name.endswith('.bz2'):
169 dc = bz2.BZ2Decompressor()
170 return self.decompress(dc.decompress, size,
171 num_retries=num_retries)
172 elif self.name.endswith('.gz'):
173 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
174 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
175 size, num_retries=num_retries)
177 return self.readall(size, num_retries=num_retries)
179 @_FileLikeObjectBase._before_close
181 def readlines(self, sizehint=float('inf'), num_retries=None):
184 for s in self.readall(num_retries=num_retries):
187 if data_size >= sizehint:
189 return b''.join(data).decode().splitlines(True)
192 raise IOError(errno.ENOSYS, "Not implemented")
194 def read(self, size, num_retries=None):
195 raise IOError(errno.ENOSYS, "Not implemented")
197 def readfrom(self, start, size, num_retries=None):
198 raise IOError(errno.ENOSYS, "Not implemented")
201 class StreamFileReader(ArvadosFileReaderBase):
202 class _NameAttribute(str):
203 # The Python file API provides a plain .name attribute.
204 # Older SDK provided a name() method.
205 # This class provides both, for maximum compatibility.
209 def __init__(self, stream, segments, name):
210 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
211 self._stream = stream
212 self.segments = segments
214 def stream_name(self):
215 return self._stream.name()
218 n = self.segments[-1]
219 return n.range_start + n.range_size
221 @_FileLikeObjectBase._before_close
223 def read(self, size, num_retries=None):
224 """Read up to 'size' bytes from the stream, starting at the current file position"""
229 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
231 lr = available_chunks[0]
232 data = self._stream.readfrom(lr.locator+lr.segment_offset,
234 num_retries=num_retries)
236 self._filepos += len(data)
239 @_FileLikeObjectBase._before_close
241 def readfrom(self, start, size, num_retries=None):
242 """Read up to 'size' bytes from the stream, starting at 'start'"""
247 for lr in locators_and_ranges(self.segments, start, size):
248 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
249 num_retries=num_retries))
250 return b''.join(data)
252 def as_manifest(self):
254 for r in self.segments:
255 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
256 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
259 def synchronized(orig_func):
260 @functools.wraps(orig_func)
261 def synchronized_wrapper(self, *args, **kwargs):
263 return orig_func(self, *args, **kwargs)
264 return synchronized_wrapper
267 class StateChangeError(Exception):
268 def __init__(self, message, state, nextstate):
269 super(StateChangeError, self).__init__(message)
271 self.nextstate = nextstate
273 class _BufferBlock(object):
274 """A stand-in for a Keep block that is in the process of being written.
276 Writers can append to it, get the size, and compute the Keep locator.
277 There are three valid states:
283 Block is in the process of being uploaded to Keep, append is an error.
286 The block has been written to Keep, its internal buffer has been
287 released, fetching the block will fetch it via keep client (since we
288 discarded the internal copy), and identifiers referring to the BufferBlock
289 can be replaced with the block locator.
299 def __init__(self, blockid, starting_capacity, owner):
302 the identifier for this block
305 the initial buffer capacity
308 ArvadosFile that owns this block
311 self.blockid = blockid
312 self.buffer_block = bytearray(starting_capacity)
313 self.buffer_view = memoryview(self.buffer_block)
314 self.write_pointer = 0
315 self._state = _BufferBlock.WRITABLE
318 self.lock = threading.Lock()
319 self.wait_for_commit = threading.Event()
323 def append(self, data):
324 """Append some data to the buffer.
326 Only valid if the block is in WRITABLE state. Implements an expanding
327 buffer, doubling capacity as needed to accomdate all the data.
330 if self._state == _BufferBlock.WRITABLE:
331 if not isinstance(data, bytes) and not isinstance(data, memoryview):
333 while (self.write_pointer+len(data)) > len(self.buffer_block):
334 new_buffer_block = bytearray(len(self.buffer_block) * 2)
335 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
336 self.buffer_block = new_buffer_block
337 self.buffer_view = memoryview(self.buffer_block)
338 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
339 self.write_pointer += len(data)
342 raise AssertionError("Buffer block is not writable")
344 STATE_TRANSITIONS = frozenset([
346 (PENDING, COMMITTED),
351 def set_state(self, nextstate, val=None):
352 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
353 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
354 self._state = nextstate
356 if self._state == _BufferBlock.PENDING:
357 self.wait_for_commit.clear()
359 if self._state == _BufferBlock.COMMITTED:
361 self.buffer_view = None
362 self.buffer_block = None
363 self.wait_for_commit.set()
365 if self._state == _BufferBlock.ERROR:
367 self.wait_for_commit.set()
374 """The amount of data written to the buffer."""
375 return self.write_pointer
379 """The Keep locator for this buffer's contents."""
380 if self._locator is None:
381 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
385 def clone(self, new_blockid, owner):
386 if self._state == _BufferBlock.COMMITTED:
387 raise AssertionError("Cannot duplicate committed buffer block")
388 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
389 bufferblock.append(self.buffer_view[0:self.size()])
394 self._state = _BufferBlock.DELETED
396 self.buffer_block = None
397 self.buffer_view = None
400 def repack_writes(self):
401 """Optimize buffer block by repacking segments in file sequence.
403 When the client makes random writes, they appear in the buffer block in
404 the sequence they were written rather than the sequence they appear in
405 the file. This makes for inefficient, fragmented manifests. Attempt
406 to optimize by repacking writes in file sequence.
409 if self._state != _BufferBlock.WRITABLE:
410 raise AssertionError("Cannot repack non-writable block")
412 segs = self.owner.segments()
414 # Collect the segments that reference the buffer block.
415 bufferblock_segs = [s for s in segs if s.locator == self.blockid]
417 # Collect total data referenced by segments (could be smaller than
418 # bufferblock size if a portion of the file was written and
420 write_total = sum([s.range_size for s in bufferblock_segs])
422 if write_total < self.size() or len(bufferblock_segs) > 1:
423 # If there's more than one segment referencing this block, it is
424 # due to out-of-order writes and will produce a fragmented
425 # manifest, so try to optimize by re-packing into a new buffer.
426 contents = self.buffer_view[0:self.write_pointer].tobytes()
427 new_bb = _BufferBlock(None, write_total, None)
428 for t in bufferblock_segs:
429 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
430 t.segment_offset = new_bb.size() - t.range_size
432 self.buffer_block = new_bb.buffer_block
433 self.buffer_view = new_bb.buffer_view
434 self.write_pointer = new_bb.write_pointer
437 self.owner.set_segments(segs)
440 return "<BufferBlock %s>" % (self.blockid)
443 class NoopLock(object):
447 def __exit__(self, exc_type, exc_value, traceback):
450 def acquire(self, blocking=False):
457 def must_be_writable(orig_func):
458 @functools.wraps(orig_func)
459 def must_be_writable_wrapper(self, *args, **kwargs):
460 if not self.writable():
461 raise IOError(errno.EROFS, "Collection is read-only.")
462 return orig_func(self, *args, **kwargs)
463 return must_be_writable_wrapper
466 class _BlockManager(object):
467 """BlockManager handles buffer blocks.
469 Also handles background block uploads, and background block prefetch for a
470 Collection of ArvadosFiles.
474 DEFAULT_PUT_THREADS = 2
476 def __init__(self, keep,
480 storage_classes_func=None):
481 """keep: KeepClient object to use"""
483 self._bufferblocks = collections.OrderedDict()
484 self._put_queue = None
485 self._put_threads = None
486 self.lock = threading.Lock()
487 self.prefetch_lookahead = self._keep.num_prefetch_threads
488 self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
490 self.storage_classes = storage_classes_func or (lambda: [])
491 self._pending_write_size = 0
492 self.threads_lock = threading.Lock()
493 self.padding_block = None
494 self.num_retries = num_retries
497 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
498 """Allocate a new, empty bufferblock in WRITABLE state and return it.
501 optional block identifier, otherwise one will be automatically assigned
504 optional capacity, otherwise will use default capacity
507 ArvadosFile that owns this block
510 return self._alloc_bufferblock(blockid, starting_capacity, owner)
512 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
514 blockid = str(uuid.uuid4())
515 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
516 self._bufferblocks[bufferblock.blockid] = bufferblock
520 def dup_block(self, block, owner):
521 """Create a new bufferblock initialized with the content of an existing bufferblock.
524 the buffer block to copy.
527 ArvadosFile that owns the new block
530 new_blockid = str(uuid.uuid4())
531 bufferblock = block.clone(new_blockid, owner)
532 self._bufferblocks[bufferblock.blockid] = bufferblock
536 def is_bufferblock(self, locator):
537 return locator in self._bufferblocks
539 def _commit_bufferblock_worker(self):
540 """Background uploader thread."""
544 bufferblock = self._put_queue.get()
545 if bufferblock is None:
548 if self.copies is None:
549 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
551 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
552 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
553 except Exception as e:
554 bufferblock.set_state(_BufferBlock.ERROR, e)
556 if self._put_queue is not None:
557 self._put_queue.task_done()
559 def start_put_threads(self):
560 with self.threads_lock:
561 if self._put_threads is None:
562 # Start uploader threads.
564 # If we don't limit the Queue size, the upload queue can quickly
565 # grow to take up gigabytes of RAM if the writing process is
566 # generating data more quickly than it can be sent to the Keep
569 # With two upload threads and a queue size of 2, this means up to 4
570 # blocks pending. If they are full 64 MiB blocks, that means up to
571 # 256 MiB of internal buffering, which is the same size as the
572 # default download block cache in KeepClient.
573 self._put_queue = queue.Queue(maxsize=2)
575 self._put_threads = []
576 for i in range(0, self.num_put_threads):
577 thread = threading.Thread(target=self._commit_bufferblock_worker)
578 self._put_threads.append(thread)
583 def stop_threads(self):
584 """Shut down and wait for background upload and download threads to finish."""
586 if self._put_threads is not None:
587 for t in self._put_threads:
588 self._put_queue.put(None)
589 for t in self._put_threads:
591 self._put_threads = None
592 self._put_queue = None
597 def __exit__(self, exc_type, exc_value, traceback):
601 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
602 """Packs small blocks together before uploading"""
604 self._pending_write_size += closed_file_size
606 # Check if there are enough small blocks for filling up one in full
607 if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
610 # Search blocks ready for getting packed together before being
612 # A WRITABLE block always has an owner.
613 # A WRITABLE block with its owner.closed() implies that its
614 # size is <= KEEP_BLOCK_SIZE/2.
616 small_blocks = [b for b in self._bufferblocks.values()
617 if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
618 except AttributeError:
619 # Writable blocks without owner shouldn't exist.
620 raise UnownedBlockError()
622 if len(small_blocks) <= 1:
623 # Not enough small blocks for repacking
626 for bb in small_blocks:
629 # Update the pending write size count with its true value, just in case
630 # some small file was opened, written and closed several times.
631 self._pending_write_size = sum([b.size() for b in small_blocks])
633 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
636 new_bb = self._alloc_bufferblock()
639 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
640 bb = small_blocks.pop(0)
641 new_bb.owner.append(bb.owner)
642 self._pending_write_size -= bb.size()
643 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
644 files.append((bb, new_bb.write_pointer - bb.size()))
646 self.commit_bufferblock(new_bb, sync=sync)
648 for bb, new_bb_segment_offset in files:
649 newsegs = bb.owner.segments()
651 if s.locator == bb.blockid:
652 s.locator = new_bb.blockid
653 s.segment_offset = new_bb_segment_offset+s.segment_offset
654 bb.owner.set_segments(newsegs)
655 self._delete_bufferblock(bb.blockid)
657 def commit_bufferblock(self, block, sync):
658 """Initiate a background upload of a bufferblock.
661 The block object to upload
664 If `sync` is True, upload the block synchronously.
665 If `sync` is False, upload the block asynchronously. This will
666 return immediately unless the upload queue is at capacity, in
667 which case it will wait on an upload queue slot.
671 # Mark the block as PENDING so to disallow any more appends.
672 block.set_state(_BufferBlock.PENDING)
673 except StateChangeError as e:
674 if e.state == _BufferBlock.PENDING:
676 block.wait_for_commit.wait()
679 if block.state() == _BufferBlock.COMMITTED:
681 elif block.state() == _BufferBlock.ERROR:
688 if self.copies is None:
689 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
691 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
692 block.set_state(_BufferBlock.COMMITTED, loc)
693 except Exception as e:
694 block.set_state(_BufferBlock.ERROR, e)
697 self.start_put_threads()
698 self._put_queue.put(block)
701 def get_bufferblock(self, locator):
702 return self._bufferblocks.get(locator)
705 def get_padding_block(self):
706 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
707 when using truncate() to extend the size of a file.
709 For reference (and possible future optimization), the md5sum of the
710 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
714 if self.padding_block is None:
715 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
716 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
717 self.commit_bufferblock(self.padding_block, False)
718 return self.padding_block
721 def delete_bufferblock(self, locator):
722 self._delete_bufferblock(locator)
724 def _delete_bufferblock(self, locator):
725 if locator in self._bufferblocks:
726 bb = self._bufferblocks[locator]
728 del self._bufferblocks[locator]
730 def get_block_contents(self, locator, num_retries, cache_only=False):
733 First checks to see if the locator is a BufferBlock and return that, if
734 not, passes the request through to KeepClient.get().
738 if locator in self._bufferblocks:
739 bufferblock = self._bufferblocks[locator]
740 if bufferblock.state() != _BufferBlock.COMMITTED:
741 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
743 locator = bufferblock._locator
745 return self._keep.get_from_cache(locator)
747 return self._keep.get(locator, num_retries=num_retries)
749 def commit_all(self):
750 """Commit all outstanding buffer blocks.
752 This is a synchronous call, and will not return until all buffer blocks
753 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
756 self.repack_small_blocks(force=True, sync=True)
759 items = list(self._bufferblocks.items())
762 if v.state() != _BufferBlock.COMMITTED and v.owner:
763 # Ignore blocks with a list of owners, as if they're not in COMMITTED
764 # state, they're already being committed asynchronously.
765 if isinstance(v.owner, ArvadosFile):
766 v.owner.flush(sync=False)
769 if self._put_queue is not None:
770 self._put_queue.join()
774 if v.state() == _BufferBlock.ERROR:
775 err.append((v.locator(), v.error))
777 raise KeepWriteError("Error writing some blocks", err, label="block")
780 # flush again with sync=True to remove committed bufferblocks from
783 if isinstance(v.owner, ArvadosFile):
784 v.owner.flush(sync=True)
785 elif isinstance(v.owner, list) and len(v.owner) > 0:
786 # This bufferblock is referenced by many files as a result
787 # of repacking small blocks, so don't delete it when flushing
788 # its owners, just do it after flushing them all.
789 for owner in v.owner:
790 owner.flush(sync=True)
791 self.delete_bufferblock(k)
795 def block_prefetch(self, locator):
796 """Initiate a background download of a block.
799 if not self.prefetch_lookahead:
803 if locator in self._bufferblocks:
806 self._keep.block_prefetch(locator)
809 class ArvadosFile(object):
810 """Represent a file in a Collection.
812 ArvadosFile manages the underlying representation of a file in Keep as a
813 sequence of segments spanning a set of blocks, and implements random
816 This object may be accessed from multiple threads.
820 __slots__ = ('parent', 'name', '_writers', '_committed',
821 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
823 def __init__(self, parent, name, stream=[], segments=[]):
825 ArvadosFile constructor.
828 a list of Range objects representing a block stream
831 a list of Range objects representing segments
835 self._writers = set()
836 self._committed = False
838 self.lock = parent.root_collection().lock
840 self._add_segment(stream, s.locator, s.range_size)
841 self._current_bblock = None
842 self._read_counter = 0
845 return self.parent.writable()
848 def permission_expired(self, as_of_dt=None):
849 """Returns True if any of the segment's locators is expired"""
850 for r in self._segments:
851 if KeepLocator(r.locator).permission_expired(as_of_dt):
856 def has_remote_blocks(self):
857 """Returns True if any of the segment's locators has a +R signature"""
859 for s in self._segments:
860 if '+R' in s.locator:
865 def _copy_remote_blocks(self, remote_blocks={}):
866 """Ask Keep to copy remote blocks and point to their local copies.
868 This is called from the parent Collection.
871 Shared cache of remote to local block mappings. This is used to avoid
872 doing extra work when blocks are shared by more than one file in
873 different subdirectories.
876 for s in self._segments:
877 if '+R' in s.locator:
879 loc = remote_blocks[s.locator]
881 loc = self.parent._my_keep().refresh_signature(s.locator)
882 remote_blocks[s.locator] = loc
884 self.parent.set_committed(False)
889 return copy.copy(self._segments)
892 def clone(self, new_parent, new_name):
893 """Make a copy of this file."""
894 cp = ArvadosFile(new_parent, new_name)
895 cp.replace_contents(self)
900 def replace_contents(self, other):
901 """Replace segments of this file with segments from another `ArvadosFile` object."""
905 for other_segment in other.segments():
906 new_loc = other_segment.locator
907 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
908 if other_segment.locator not in map_loc:
909 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
910 if bufferblock.state() != _BufferBlock.WRITABLE:
911 map_loc[other_segment.locator] = bufferblock.locator()
913 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
914 new_loc = map_loc[other_segment.locator]
916 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
918 self.set_committed(False)
920 def __eq__(self, other):
923 if not isinstance(other, ArvadosFile):
926 othersegs = other.segments()
928 if len(self._segments) != len(othersegs):
930 for i in range(0, len(othersegs)):
931 seg1 = self._segments[i]
936 if self.parent._my_block_manager().is_bufferblock(loc1):
937 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
939 if other.parent._my_block_manager().is_bufferblock(loc2):
940 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
942 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
943 seg1.range_start != seg2.range_start or
944 seg1.range_size != seg2.range_size or
945 seg1.segment_offset != seg2.segment_offset):
950 def __ne__(self, other):
951 return not self.__eq__(other)
954 def set_segments(self, segs):
955 self._segments = segs
958 def set_committed(self, value=True):
959 """Set committed flag.
961 If value is True, set committed to be True.
963 If value is False, set committed to be False for this and all parents.
965 if value == self._committed:
967 self._committed = value
968 if self._committed is False and self.parent is not None:
969 self.parent.set_committed(False)
973 """Get whether this is committed or not."""
974 return self._committed
977 def add_writer(self, writer):
978 """Add an ArvadosFileWriter reference to the list of writers"""
979 if isinstance(writer, ArvadosFileWriter):
980 self._writers.add(writer)
983 def remove_writer(self, writer, flush):
985 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
986 and do some block maintenance tasks.
988 self._writers.remove(writer)
990 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
991 # File writer closed, not small enough for repacking
994 # All writers closed and size is adequate for repacking
995 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
999 Get whether this is closed or not. When the writers list is empty, the file
1000 is supposed to be closed.
1002 return len(self._writers) == 0
1006 def truncate(self, size):
1007 """Shrink or expand the size of the file.
1009 If `size` is less than the size of the file, the file contents after
1010 `size` will be discarded. If `size` is greater than the current size
1011 of the file, it will be filled with zero bytes.
1014 if size < self.size():
1016 for r in self._segments:
1017 range_end = r.range_start+r.range_size
1018 if r.range_start >= size:
1019 # segment is past the trucate size, all done
1021 elif size < range_end:
1022 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1023 nr.segment_offset = r.segment_offset
1029 self._segments = new_segs
1030 self.set_committed(False)
1031 elif size > self.size():
1032 padding = self.parent._my_block_manager().get_padding_block()
1033 diff = size - self.size()
1034 while diff > config.KEEP_BLOCK_SIZE:
1035 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1036 diff -= config.KEEP_BLOCK_SIZE
1038 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1039 self.set_committed(False)
1041 # size == self.size()
1044 def readfrom(self, offset, size, num_retries, exact=False):
1045 """Read up to `size` bytes from the file starting at `offset`.
1048 If False (default), return less data than requested if the read
1049 crosses a block boundary and the next block isn't cached. If True,
1050 only return less data than requested when hitting EOF.
1054 if size == 0 or offset >= self.size():
1056 readsegs = locators_and_ranges(self._segments, offset, size)
1059 prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1060 if prefetch_lookahead:
1061 # Doing prefetch on every read() call is surprisingly expensive
1062 # when we're trying to deliver data at 600+ MiBps and want
1063 # the read() fast path to be as lightweight as possible.
1065 # Only prefetching every 128 read operations
1066 # dramatically reduces the overhead while still
1067 # getting the benefit of prefetching (e.g. when
1068 # reading 128 KiB at a time, it checks for prefetch
1070 self._read_counter = (self._read_counter+1) % 128
1071 if self._read_counter == 1:
1072 prefetch = locators_and_ranges(self._segments,
1074 config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1075 limit=(1+prefetch_lookahead))
1080 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1082 blockview = memoryview(block)
1083 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1084 locs.add(lr.locator)
1090 if lr.locator not in locs:
1091 self.parent._my_block_manager().block_prefetch(lr.locator)
1092 locs.add(lr.locator)
1097 return b''.join(data)
1101 def writeto(self, offset, data, num_retries):
1102 """Write `data` to the file starting at `offset`.
1104 This will update existing bytes and/or extend the size of the file as
1108 if not isinstance(data, bytes) and not isinstance(data, memoryview):
1109 data = data.encode()
1113 if offset > self.size():
1114 self.truncate(offset)
1116 if len(data) > config.KEEP_BLOCK_SIZE:
1117 # Chunk it up into smaller writes
1119 dataview = memoryview(data)
1120 while n < len(data):
1121 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1122 n += config.KEEP_BLOCK_SIZE
1125 self.set_committed(False)
1127 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1128 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1130 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1131 self._current_bblock.repack_writes()
1132 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1133 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1134 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1136 self._current_bblock.append(data)
1138 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1140 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1145 def flush(self, sync=True, num_retries=0):
1146 """Flush the current bufferblock to Keep.
1149 If True, commit block synchronously, wait until buffer block has been written.
1150 If False, commit block asynchronously, return immediately after putting block into
1153 if self.committed():
1156 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1157 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1158 self._current_bblock.repack_writes()
1159 if self._current_bblock.state() != _BufferBlock.DELETED:
1160 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1164 for s in self._segments:
1165 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1167 if bb.state() != _BufferBlock.COMMITTED:
1168 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1169 to_delete.add(s.locator)
1170 s.locator = bb.locator()
1172 # Don't delete the bufferblock if it's owned by many files. It'll be
1173 # deleted after all of its owners are flush()ed.
1174 if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1175 self.parent._my_block_manager().delete_bufferblock(s)
1177 self.parent.notify(MOD, self.parent, self.name, (self, self))
1181 def add_segment(self, blocks, pos, size):
1182 """Add a segment to the end of the file.
1184 `pos` and `offset` reference a section of the stream described by
1185 `blocks` (a list of Range objects)
1188 self._add_segment(blocks, pos, size)
1190 def _add_segment(self, blocks, pos, size):
1191 """Internal implementation of add_segment."""
1192 self.set_committed(False)
1193 for lr in locators_and_ranges(blocks, pos, size):
1194 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1195 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1196 self._segments.append(r)
1200 """Get the file size."""
1202 n = self._segments[-1]
1203 return n.range_start + n.range_size
1208 def manifest_text(self, stream_name=".", portable_locators=False,
1209 normalize=False, only_committed=False):
1212 for segment in self._segments:
1213 loc = segment.locator
1214 if self.parent._my_block_manager().is_bufferblock(loc):
1217 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1218 if portable_locators:
1219 loc = KeepLocator(loc).stripped()
1220 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1221 segment.segment_offset, segment.range_size))
1222 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1228 def _reparent(self, newparent, newname):
1229 self.set_committed(False)
1230 self.flush(sync=True)
1231 self.parent.remove(self.name)
1232 self.parent = newparent
1234 self.lock = self.parent.root_collection().lock
1237 class ArvadosFileReader(ArvadosFileReaderBase):
1238 """Wraps ArvadosFile in a file-like object supporting reading only.
1240 Be aware that this class is NOT thread safe as there is no locking around
1241 updating file pointer.
1245 def __init__(self, arvadosfile, mode="r", num_retries=None):
1246 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1247 self.arvadosfile = arvadosfile
1250 return self.arvadosfile.size()
1252 def stream_name(self):
1253 return self.arvadosfile.parent.stream_name()
1255 def readinto(self, b):
1256 data = self.read(len(b))
1257 b[:len(data)] = data
1260 @_FileLikeObjectBase._before_close
1262 def read(self, size=None, num_retries=None):
1263 """Read up to `size` bytes from the file and return the result.
1265 Starts at the current file position. If `size` is None, read the
1266 entire remainder of the file.
1270 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1273 self._filepos += len(rd)
1274 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1275 return b''.join(data)
1277 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1278 self._filepos += len(data)
1281 @_FileLikeObjectBase._before_close
1283 def readfrom(self, offset, size, num_retries=None):
1284 """Read up to `size` bytes from the stream, starting at the specified file offset.
1286 This method does not change the file position.
1288 return self.arvadosfile.readfrom(offset, size, num_retries)
1294 class ArvadosFileWriter(ArvadosFileReader):
1295 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1297 Be aware that this class is NOT thread safe as there is no locking around
1298 updating file pointer.
1302 def __init__(self, arvadosfile, mode, num_retries=None):
1303 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1304 self.arvadosfile.add_writer(self)
1309 @_FileLikeObjectBase._before_close
1311 def write(self, data, num_retries=None):
1312 if self.mode[0] == "a":
1313 self._filepos = self.size()
1314 self.arvadosfile.writeto(self._filepos, data, num_retries)
1315 self._filepos += len(data)
1318 @_FileLikeObjectBase._before_close
1320 def writelines(self, seq, num_retries=None):
1322 self.write(s, num_retries=num_retries)
1324 @_FileLikeObjectBase._before_close
1325 def truncate(self, size=None):
1327 size = self._filepos
1328 self.arvadosfile.truncate(size)
1330 @_FileLikeObjectBase._before_close
1332 self.arvadosfile.flush()
1334 def close(self, flush=True):
1336 self.arvadosfile.remove_writer(self, flush)
1337 super(ArvadosFileWriter, self).close()
1340 class WrappableFile(object):
1341 """An interface to an Arvados file that's compatible with io wrappers.
1344 def __init__(self, f):
1349 return self.f.close()
1351 return self.f.flush()
1352 def read(self, *args, **kwargs):
1353 return self.f.read(*args, **kwargs)
1355 return self.f.readable()
1356 def readinto(self, *args, **kwargs):
1357 return self.f.readinto(*args, **kwargs)
1358 def seek(self, *args, **kwargs):
1359 return self.f.seek(*args, **kwargs)
1361 return self.f.seekable()
1363 return self.f.tell()
1365 return self.f.writable()
1366 def write(self, *args, **kwargs):
1367 return self.f.write(*args, **kwargs)