1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
21 from .errors import KeepWriteError, AssertionError, ArgumentError
22 from .keep import KeepLocator
23 from ._normalize_stream import normalize_stream
24 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
25 from .retry import retry_method
30 _logger = logging.getLogger('arvados.arvfile')
33 """split(path) -> streamname, filename
35 Separate the stream name and file name in a /-separated stream path and
36 return a tuple (stream_name, file_name). If no stream name is available,
41 stream_name, file_name = path.rsplit('/', 1)
42 except ValueError: # No / in string
43 stream_name, file_name = '.', path
44 return stream_name, file_name
47 class UnownedBlockError(Exception):
48 """Raised when there's an writable block without an owner on the BlockManager."""
52 class _FileLikeObjectBase(object):
53 def __init__(self, name, mode):
59 def _before_close(orig_func):
60 @functools.wraps(orig_func)
61 def before_close_wrapper(self, *args, **kwargs):
63 raise ValueError("I/O operation on closed stream file")
64 return orig_func(self, *args, **kwargs)
65 return before_close_wrapper
70 def __exit__(self, exc_type, exc_value, traceback):
81 class ArvadosFileReaderBase(_FileLikeObjectBase):
82 def __init__(self, name, mode, num_retries=None):
83 super(ArvadosFileReaderBase, self).__init__(name, mode)
85 self.num_retries = num_retries
86 self._readline_cache = (None, None)
90 data = self.readline()
95 def decompressed_name(self):
96 return re.sub(r'\.(bz2|gz)$', '', self.name)
98 @_FileLikeObjectBase._before_close
99 def seek(self, pos, whence=os.SEEK_SET):
100 if whence == os.SEEK_CUR:
102 elif whence == os.SEEK_END:
105 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
121 @_FileLikeObjectBase._before_close
123 def readall(self, size=2**20, num_retries=None):
125 data = self.read(size, num_retries=num_retries)
130 @_FileLikeObjectBase._before_close
132 def readline(self, size=float('inf'), num_retries=None):
133 cache_pos, cache_data = self._readline_cache
134 if self.tell() == cache_pos:
136 self._filepos += len(cache_data)
139 data_size = len(data[-1])
140 while (data_size < size) and (b'\n' not in data[-1]):
141 next_read = self.read(2 ** 20, num_retries=num_retries)
144 data.append(next_read)
145 data_size += len(next_read)
146 data = b''.join(data)
148 nextline_index = data.index(b'\n') + 1
150 nextline_index = len(data)
151 nextline_index = min(nextline_index, size)
152 self._filepos -= len(data) - nextline_index
153 self._readline_cache = (self.tell(), data[nextline_index:])
154 return data[:nextline_index].decode()
156 @_FileLikeObjectBase._before_close
158 def decompress(self, decompress, size, num_retries=None):
159 for segment in self.readall(size, num_retries=num_retries):
160 data = decompress(segment)
164 @_FileLikeObjectBase._before_close
166 def readall_decompressed(self, size=2**20, num_retries=None):
168 if self.name.endswith('.bz2'):
169 dc = bz2.BZ2Decompressor()
170 return self.decompress(dc.decompress, size,
171 num_retries=num_retries)
172 elif self.name.endswith('.gz'):
173 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
174 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
175 size, num_retries=num_retries)
177 return self.readall(size, num_retries=num_retries)
179 @_FileLikeObjectBase._before_close
181 def readlines(self, sizehint=float('inf'), num_retries=None):
184 for s in self.readall(num_retries=num_retries):
187 if data_size >= sizehint:
189 return b''.join(data).decode().splitlines(True)
192 raise IOError(errno.ENOSYS, "Not implemented")
194 def read(self, size, num_retries=None):
195 raise IOError(errno.ENOSYS, "Not implemented")
197 def readfrom(self, start, size, num_retries=None):
198 raise IOError(errno.ENOSYS, "Not implemented")
201 class StreamFileReader(ArvadosFileReaderBase):
202 class _NameAttribute(str):
203 # The Python file API provides a plain .name attribute.
204 # Older SDK provided a name() method.
205 # This class provides both, for maximum compatibility.
209 def __init__(self, stream, segments, name):
210 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
211 self._stream = stream
212 self.segments = segments
214 def stream_name(self):
215 return self._stream.name()
218 n = self.segments[-1]
219 return n.range_start + n.range_size
221 @_FileLikeObjectBase._before_close
223 def read(self, size, num_retries=None):
224 """Read up to 'size' bytes from the stream, starting at the current file position"""
229 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
231 lr = available_chunks[0]
232 data = self._stream.readfrom(lr.locator+lr.segment_offset,
234 num_retries=num_retries)
236 self._filepos += len(data)
239 @_FileLikeObjectBase._before_close
241 def readfrom(self, start, size, num_retries=None):
242 """Read up to 'size' bytes from the stream, starting at 'start'"""
247 for lr in locators_and_ranges(self.segments, start, size):
248 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
249 num_retries=num_retries))
250 return b''.join(data)
252 def as_manifest(self):
254 for r in self.segments:
255 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
256 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
259 def synchronized(orig_func):
260 @functools.wraps(orig_func)
261 def synchronized_wrapper(self, *args, **kwargs):
263 return orig_func(self, *args, **kwargs)
264 return synchronized_wrapper
267 class StateChangeError(Exception):
268 def __init__(self, message, state, nextstate):
269 super(StateChangeError, self).__init__(message)
271 self.nextstate = nextstate
273 class _BufferBlock(object):
274 """A stand-in for a Keep block that is in the process of being written.
276 Writers can append to it, get the size, and compute the Keep locator.
277 There are three valid states:
283 Block is in the process of being uploaded to Keep, append is an error.
286 The block has been written to Keep, its internal buffer has been
287 released, fetching the block will fetch it via keep client (since we
288 discarded the internal copy), and identifiers referring to the BufferBlock
289 can be replaced with the block locator.
299 def __init__(self, blockid, starting_capacity, owner):
302 the identifier for this block
305 the initial buffer capacity
308 ArvadosFile that owns this block
311 self.blockid = blockid
312 self.buffer_block = bytearray(starting_capacity)
313 self.buffer_view = memoryview(self.buffer_block)
314 self.write_pointer = 0
315 self._state = _BufferBlock.WRITABLE
318 self.lock = threading.Lock()
319 self.wait_for_commit = threading.Event()
323 def append(self, data):
324 """Append some data to the buffer.
326 Only valid if the block is in WRITABLE state. Implements an expanding
327 buffer, doubling capacity as needed to accomdate all the data.
330 if self._state == _BufferBlock.WRITABLE:
331 if not isinstance(data, bytes) and not isinstance(data, memoryview):
333 while (self.write_pointer+len(data)) > len(self.buffer_block):
334 new_buffer_block = bytearray(len(self.buffer_block) * 2)
335 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
336 self.buffer_block = new_buffer_block
337 self.buffer_view = memoryview(self.buffer_block)
338 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
339 self.write_pointer += len(data)
342 raise AssertionError("Buffer block is not writable")
344 STATE_TRANSITIONS = frozenset([
346 (PENDING, COMMITTED),
351 def set_state(self, nextstate, val=None):
352 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
353 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
354 self._state = nextstate
356 if self._state == _BufferBlock.PENDING:
357 self.wait_for_commit.clear()
359 if self._state == _BufferBlock.COMMITTED:
361 self.buffer_view = None
362 self.buffer_block = None
363 self.wait_for_commit.set()
365 if self._state == _BufferBlock.ERROR:
367 self.wait_for_commit.set()
374 """The amount of data written to the buffer."""
375 return self.write_pointer
379 """The Keep locator for this buffer's contents."""
380 if self._locator is None:
381 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
385 def clone(self, new_blockid, owner):
386 if self._state == _BufferBlock.COMMITTED:
387 raise AssertionError("Cannot duplicate committed buffer block")
388 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
389 bufferblock.append(self.buffer_view[0:self.size()])
394 self._state = _BufferBlock.DELETED
396 self.buffer_block = None
397 self.buffer_view = None
400 def repack_writes(self):
401 """Optimize buffer block by repacking segments in file sequence.
403 When the client makes random writes, they appear in the buffer block in
404 the sequence they were written rather than the sequence they appear in
405 the file. This makes for inefficient, fragmented manifests. Attempt
406 to optimize by repacking writes in file sequence.
409 if self._state != _BufferBlock.WRITABLE:
410 raise AssertionError("Cannot repack non-writable block")
412 segs = self.owner.segments()
414 # Collect the segments that reference the buffer block.
415 bufferblock_segs = [s for s in segs if s.locator == self.blockid]
417 # Collect total data referenced by segments (could be smaller than
418 # bufferblock size if a portion of the file was written and
420 write_total = sum([s.range_size for s in bufferblock_segs])
422 if write_total < self.size() or len(bufferblock_segs) > 1:
423 # If there's more than one segment referencing this block, it is
424 # due to out-of-order writes and will produce a fragmented
425 # manifest, so try to optimize by re-packing into a new buffer.
426 contents = self.buffer_view[0:self.write_pointer].tobytes()
427 new_bb = _BufferBlock(None, write_total, None)
428 for t in bufferblock_segs:
429 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
430 t.segment_offset = new_bb.size() - t.range_size
432 self.buffer_block = new_bb.buffer_block
433 self.buffer_view = new_bb.buffer_view
434 self.write_pointer = new_bb.write_pointer
437 self.owner.set_segments(segs)
440 return "<BufferBlock %s>" % (self.blockid)
443 class NoopLock(object):
447 def __exit__(self, exc_type, exc_value, traceback):
450 def acquire(self, blocking=False):
457 def must_be_writable(orig_func):
458 @functools.wraps(orig_func)
459 def must_be_writable_wrapper(self, *args, **kwargs):
460 if not self.writable():
461 raise IOError(errno.EROFS, "Collection is read-only.")
462 return orig_func(self, *args, **kwargs)
463 return must_be_writable_wrapper
466 class _BlockManager(object):
467 """BlockManager handles buffer blocks.
469 Also handles background block uploads, and background block prefetch for a
470 Collection of ArvadosFiles.
474 DEFAULT_PUT_THREADS = 2
476 def __init__(self, keep,
480 storage_classes_func=None):
481 """keep: KeepClient object to use"""
483 self._bufferblocks = collections.OrderedDict()
484 self._put_queue = None
485 self._put_threads = None
486 self.lock = threading.Lock()
487 self.prefetch_lookahead = self._keep.num_prefetch_threads
488 self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
490 self.storage_classes = storage_classes_func or (lambda: [])
491 self._pending_write_size = 0
492 self.threads_lock = threading.Lock()
493 self.padding_block = None
494 self.num_retries = num_retries
497 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
498 """Allocate a new, empty bufferblock in WRITABLE state and return it.
501 optional block identifier, otherwise one will be automatically assigned
504 optional capacity, otherwise will use default capacity
507 ArvadosFile that owns this block
510 return self._alloc_bufferblock(blockid, starting_capacity, owner)
512 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
514 blockid = str(uuid.uuid4())
515 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
516 self._bufferblocks[bufferblock.blockid] = bufferblock
520 def dup_block(self, block, owner):
521 """Create a new bufferblock initialized with the content of an existing bufferblock.
524 the buffer block to copy.
527 ArvadosFile that owns the new block
530 new_blockid = str(uuid.uuid4())
531 bufferblock = block.clone(new_blockid, owner)
532 self._bufferblocks[bufferblock.blockid] = bufferblock
536 def is_bufferblock(self, locator):
537 return locator in self._bufferblocks
539 def _commit_bufferblock_worker(self):
540 """Background uploader thread."""
544 bufferblock = self._put_queue.get()
545 if bufferblock is None:
548 if self.copies is None:
549 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
551 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
552 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
553 except Exception as e:
554 bufferblock.set_state(_BufferBlock.ERROR, e)
556 if self._put_queue is not None:
557 self._put_queue.task_done()
559 def start_put_threads(self):
560 with self.threads_lock:
561 if self._put_threads is None:
562 # Start uploader threads.
564 # If we don't limit the Queue size, the upload queue can quickly
565 # grow to take up gigabytes of RAM if the writing process is
566 # generating data more quickly than it can be sent to the Keep
569 # With two upload threads and a queue size of 2, this means up to 4
570 # blocks pending. If they are full 64 MiB blocks, that means up to
571 # 256 MiB of internal buffering, which is the same size as the
572 # default download block cache in KeepClient.
573 self._put_queue = queue.Queue(maxsize=2)
575 self._put_threads = []
576 for i in range(0, self.num_put_threads):
577 thread = threading.Thread(target=self._commit_bufferblock_worker)
578 self._put_threads.append(thread)
583 def stop_threads(self):
584 """Shut down and wait for background upload and download threads to finish."""
586 if self._put_threads is not None:
587 for t in self._put_threads:
588 self._put_queue.put(None)
589 for t in self._put_threads:
591 self._put_threads = None
592 self._put_queue = None
597 def __exit__(self, exc_type, exc_value, traceback):
601 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
602 """Packs small blocks together before uploading"""
604 self._pending_write_size += closed_file_size
606 # Check if there are enough small blocks for filling up one in full
607 if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
610 # Search blocks ready for getting packed together before being
612 # A WRITABLE block always has an owner.
613 # A WRITABLE block with its owner.closed() implies that its
614 # size is <= KEEP_BLOCK_SIZE/2.
616 small_blocks = [b for b in self._bufferblocks.values()
617 if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
618 except AttributeError:
619 # Writable blocks without owner shouldn't exist.
620 raise UnownedBlockError()
622 if len(small_blocks) <= 1:
623 # Not enough small blocks for repacking
626 for bb in small_blocks:
629 # Update the pending write size count with its true value, just in case
630 # some small file was opened, written and closed several times.
631 self._pending_write_size = sum([b.size() for b in small_blocks])
633 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
636 new_bb = self._alloc_bufferblock()
639 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
640 bb = small_blocks.pop(0)
641 new_bb.owner.append(bb.owner)
642 self._pending_write_size -= bb.size()
643 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
644 files.append((bb, new_bb.write_pointer - bb.size()))
646 self.commit_bufferblock(new_bb, sync=sync)
648 for bb, new_bb_segment_offset in files:
649 newsegs = bb.owner.segments()
651 if s.locator == bb.blockid:
652 s.locator = new_bb.blockid
653 s.segment_offset = new_bb_segment_offset+s.segment_offset
654 bb.owner.set_segments(newsegs)
655 self._delete_bufferblock(bb.blockid)
657 def commit_bufferblock(self, block, sync):
658 """Initiate a background upload of a bufferblock.
661 The block object to upload
664 If `sync` is True, upload the block synchronously.
665 If `sync` is False, upload the block asynchronously. This will
666 return immediately unless the upload queue is at capacity, in
667 which case it will wait on an upload queue slot.
671 # Mark the block as PENDING so to disallow any more appends.
672 block.set_state(_BufferBlock.PENDING)
673 except StateChangeError as e:
674 if e.state == _BufferBlock.PENDING:
676 block.wait_for_commit.wait()
679 if block.state() == _BufferBlock.COMMITTED:
681 elif block.state() == _BufferBlock.ERROR:
688 if self.copies is None:
689 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
691 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
692 block.set_state(_BufferBlock.COMMITTED, loc)
693 except Exception as e:
694 block.set_state(_BufferBlock.ERROR, e)
697 self.start_put_threads()
698 self._put_queue.put(block)
701 def get_bufferblock(self, locator):
702 return self._bufferblocks.get(locator)
705 def get_padding_block(self):
706 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
707 when using truncate() to extend the size of a file.
709 For reference (and possible future optimization), the md5sum of the
710 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
714 if self.padding_block is None:
715 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
716 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
717 self.commit_bufferblock(self.padding_block, False)
718 return self.padding_block
721 def delete_bufferblock(self, locator):
722 self._delete_bufferblock(locator)
724 def _delete_bufferblock(self, locator):
725 if locator in self._bufferblocks:
726 bb = self._bufferblocks[locator]
728 del self._bufferblocks[locator]
730 def get_block_contents(self, locator, num_retries, cache_only=False):
733 First checks to see if the locator is a BufferBlock and return that, if
734 not, passes the request through to KeepClient.get().
738 if locator in self._bufferblocks:
739 bufferblock = self._bufferblocks[locator]
740 if bufferblock.state() != _BufferBlock.COMMITTED:
741 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
743 locator = bufferblock._locator
745 return self._keep.get_from_cache(locator)
747 return self._keep.get(locator, num_retries=num_retries)
749 def commit_all(self):
750 """Commit all outstanding buffer blocks.
752 This is a synchronous call, and will not return until all buffer blocks
753 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
756 self.repack_small_blocks(force=True, sync=True)
759 items = list(self._bufferblocks.items())
762 if v.state() != _BufferBlock.COMMITTED and v.owner:
763 # Ignore blocks with a list of owners, as if they're not in COMMITTED
764 # state, they're already being committed asynchronously.
765 if isinstance(v.owner, ArvadosFile):
766 v.owner.flush(sync=False)
769 if self._put_queue is not None:
770 self._put_queue.join()
774 if v.state() == _BufferBlock.ERROR:
775 err.append((v.locator(), v.error))
777 raise KeepWriteError("Error writing some blocks", err, label="block")
780 # flush again with sync=True to remove committed bufferblocks from
783 if isinstance(v.owner, ArvadosFile):
784 v.owner.flush(sync=True)
785 elif isinstance(v.owner, list) and len(v.owner) > 0:
786 # This bufferblock is referenced by many files as a result
787 # of repacking small blocks, so don't delete it when flushing
788 # its owners, just do it after flushing them all.
789 for owner in v.owner:
790 owner.flush(sync=True)
791 self.delete_bufferblock(k)
795 def block_prefetch(self, locator):
796 """Initiate a background download of a block.
799 if not self.prefetch_lookahead:
803 if locator in self._bufferblocks:
806 self._keep.block_prefetch(locator)
809 class ArvadosFile(object):
810 """Represent a file in a Collection.
812 ArvadosFile manages the underlying representation of a file in Keep as a
813 sequence of segments spanning a set of blocks, and implements random
816 This object may be accessed from multiple threads.
820 __slots__ = ('parent', 'name', '_writers', '_committed',
821 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
823 def __init__(self, parent, name, stream=[], segments=[]):
825 ArvadosFile constructor.
828 a list of Range objects representing a block stream
831 a list of Range objects representing segments
835 self._writers = set()
836 self._committed = False
838 self.lock = parent.root_collection().lock
840 self._add_segment(stream, s.locator, s.range_size)
841 self._current_bblock = None
842 self._read_counter = 0
845 return self.parent.writable()
848 def permission_expired(self, as_of_dt=None):
849 """Returns True if any of the segment's locators is expired"""
850 for r in self._segments:
851 if KeepLocator(r.locator).permission_expired(as_of_dt):
856 def has_remote_blocks(self):
857 """Returns True if any of the segment's locators has a +R signature"""
859 for s in self._segments:
860 if '+R' in s.locator:
865 def _copy_remote_blocks(self, remote_blocks={}):
866 """Ask Keep to copy remote blocks and point to their local copies.
868 This is called from the parent Collection.
871 Shared cache of remote to local block mappings. This is used to avoid
872 doing extra work when blocks are shared by more than one file in
873 different subdirectories.
876 for s in self._segments:
877 if '+R' in s.locator:
879 loc = remote_blocks[s.locator]
881 loc = self.parent._my_keep().refresh_signature(s.locator)
882 remote_blocks[s.locator] = loc
884 self.parent.set_committed(False)
889 return copy.copy(self._segments)
892 def clone(self, new_parent, new_name):
893 """Make a copy of this file."""
894 cp = ArvadosFile(new_parent, new_name)
895 cp.replace_contents(self)
900 def replace_contents(self, other):
901 """Replace segments of this file with segments from another `ArvadosFile` object."""
905 for other_segment in other.segments():
906 new_loc = other_segment.locator
907 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
908 if other_segment.locator not in map_loc:
909 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
910 if bufferblock.state() != _BufferBlock.WRITABLE:
911 map_loc[other_segment.locator] = bufferblock.locator()
913 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
914 new_loc = map_loc[other_segment.locator]
916 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
918 self.set_committed(False)
920 def __eq__(self, other):
923 if not isinstance(other, ArvadosFile):
926 othersegs = other.segments()
928 if len(self._segments) != len(othersegs):
930 for i in range(0, len(othersegs)):
931 seg1 = self._segments[i]
936 if self.parent._my_block_manager().is_bufferblock(loc1):
937 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
939 if other.parent._my_block_manager().is_bufferblock(loc2):
940 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
942 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
943 seg1.range_start != seg2.range_start or
944 seg1.range_size != seg2.range_size or
945 seg1.segment_offset != seg2.segment_offset):
950 def __ne__(self, other):
951 return not self.__eq__(other)
954 def set_segments(self, segs):
955 self._segments = segs
958 def set_committed(self, value=True):
959 """Set committed flag.
961 If value is True, set committed to be True.
963 If value is False, set committed to be False for this and all parents.
965 if value == self._committed:
967 self._committed = value
968 if self._committed is False and self.parent is not None:
969 self.parent.set_committed(False)
973 """Get whether this is committed or not."""
974 return self._committed
977 def add_writer(self, writer):
978 """Add an ArvadosFileWriter reference to the list of writers"""
979 if isinstance(writer, ArvadosFileWriter):
980 self._writers.add(writer)
983 def remove_writer(self, writer, flush):
985 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
986 and do some block maintenance tasks.
988 self._writers.remove(writer)
990 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
991 # File writer closed, not small enough for repacking
994 # All writers closed and size is adequate for repacking
995 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
999 Get whether this is closed or not. When the writers list is empty, the file
1000 is supposed to be closed.
1002 return len(self._writers) == 0
1006 def truncate(self, size):
1007 """Shrink or expand the size of the file.
1009 If `size` is less than the size of the file, the file contents after
1010 `size` will be discarded. If `size` is greater than the current size
1011 of the file, it will be filled with zero bytes.
1014 if size < self.size():
1016 for r in self._segments:
1017 range_end = r.range_start+r.range_size
1018 if r.range_start >= size:
1019 # segment is past the trucate size, all done
1021 elif size < range_end:
1022 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1023 nr.segment_offset = r.segment_offset
1029 self._segments = new_segs
1030 self.set_committed(False)
1031 elif size > self.size():
1032 padding = self.parent._my_block_manager().get_padding_block()
1033 diff = size - self.size()
1034 while diff > config.KEEP_BLOCK_SIZE:
1035 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1036 diff -= config.KEEP_BLOCK_SIZE
1038 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1039 self.set_committed(False)
1041 # size == self.size()
1044 def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False):
1045 """Read up to `size` bytes from the file starting at `offset`.
1049 * exact: bool --- If False (default), return less data than
1050 requested if the read crosses a block boundary and the next
1051 block isn't cached. If True, only return less data than
1052 requested when hitting EOF.
1054 * return_memoryview: bool --- If False (default) return a
1055 `bytes` object, which may entail making a copy in some
1056 situations. If True, return a `memoryview` object which may
1057 avoid making a copy, but may be incompatible with code
1058 expecting a `bytes` object.
1063 if size == 0 or offset >= self.size():
1064 return memoryview(b'') if return_memoryview else b''
1065 readsegs = locators_and_ranges(self._segments, offset, size)
1068 prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1069 if prefetch_lookahead:
1070 # Doing prefetch on every read() call is surprisingly expensive
1071 # when we're trying to deliver data at 600+ MiBps and want
1072 # the read() fast path to be as lightweight as possible.
1074 # Only prefetching every 128 read operations
1075 # dramatically reduces the overhead while still
1076 # getting the benefit of prefetching (e.g. when
1077 # reading 128 KiB at a time, it checks for prefetch
1079 self._read_counter = (self._read_counter+1) % 128
1080 if self._read_counter == 1:
1081 prefetch = locators_and_ranges(self._segments,
1083 config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1084 limit=(1+prefetch_lookahead))
1089 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1091 blockview = memoryview(block)
1092 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1093 locs.add(lr.locator)
1099 if lr.locator not in locs:
1100 self.parent._my_block_manager().block_prefetch(lr.locator)
1101 locs.add(lr.locator)
1104 return data[0] if return_memoryview else data[0].tobytes()
1106 return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1111 def writeto(self, offset, data, num_retries):
1112 """Write `data` to the file starting at `offset`.
1114 This will update existing bytes and/or extend the size of the file as
1118 if not isinstance(data, bytes) and not isinstance(data, memoryview):
1119 data = data.encode()
1123 if offset > self.size():
1124 self.truncate(offset)
1126 if len(data) > config.KEEP_BLOCK_SIZE:
1127 # Chunk it up into smaller writes
1129 dataview = memoryview(data)
1130 while n < len(data):
1131 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1132 n += config.KEEP_BLOCK_SIZE
1135 self.set_committed(False)
1137 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1138 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1140 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1141 self._current_bblock.repack_writes()
1142 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1143 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1144 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1146 self._current_bblock.append(data)
1148 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1150 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1155 def flush(self, sync=True, num_retries=0):
1156 """Flush the current bufferblock to Keep.
1159 If True, commit block synchronously, wait until buffer block has been written.
1160 If False, commit block asynchronously, return immediately after putting block into
1163 if self.committed():
1166 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1167 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1168 self._current_bblock.repack_writes()
1169 if self._current_bblock.state() != _BufferBlock.DELETED:
1170 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1174 for s in self._segments:
1175 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1177 if bb.state() != _BufferBlock.COMMITTED:
1178 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1179 to_delete.add(s.locator)
1180 s.locator = bb.locator()
1182 # Don't delete the bufferblock if it's owned by many files. It'll be
1183 # deleted after all of its owners are flush()ed.
1184 if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1185 self.parent._my_block_manager().delete_bufferblock(s)
1187 self.parent.notify(MOD, self.parent, self.name, (self, self))
1191 def add_segment(self, blocks, pos, size):
1192 """Add a segment to the end of the file.
1194 `pos` and `offset` reference a section of the stream described by
1195 `blocks` (a list of Range objects)
1198 self._add_segment(blocks, pos, size)
1200 def _add_segment(self, blocks, pos, size):
1201 """Internal implementation of add_segment."""
1202 self.set_committed(False)
1203 for lr in locators_and_ranges(blocks, pos, size):
1204 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1205 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1206 self._segments.append(r)
1210 """Get the file size."""
1212 n = self._segments[-1]
1213 return n.range_start + n.range_size
1218 def manifest_text(self, stream_name=".", portable_locators=False,
1219 normalize=False, only_committed=False):
1222 for segment in self._segments:
1223 loc = segment.locator
1224 if self.parent._my_block_manager().is_bufferblock(loc):
1227 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1228 if portable_locators:
1229 loc = KeepLocator(loc).stripped()
1230 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1231 segment.segment_offset, segment.range_size))
1232 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1238 def _reparent(self, newparent, newname):
1239 self.set_committed(False)
1240 self.flush(sync=True)
1241 self.parent.remove(self.name)
1242 self.parent = newparent
1244 self.lock = self.parent.root_collection().lock
1247 class ArvadosFileReader(ArvadosFileReaderBase):
1248 """Wraps ArvadosFile in a file-like object supporting reading only.
1250 Be aware that this class is NOT thread safe as there is no locking around
1251 updating file pointer.
1255 def __init__(self, arvadosfile, mode="r", num_retries=None):
1256 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1257 self.arvadosfile = arvadosfile
1260 return self.arvadosfile.size()
1262 def stream_name(self):
1263 return self.arvadosfile.parent.stream_name()
1265 def readinto(self, b):
1266 data = self.read(len(b))
1267 b[:len(data)] = data
1270 @_FileLikeObjectBase._before_close
1272 def read(self, size=-1, num_retries=None, return_memoryview=False):
1273 """Read up to `size` bytes from the file and return the result.
1275 Starts at the current file position. If `size` is negative or None,
1276 read the entire remainder of the file.
1278 Returns None if the file pointer is at the end of the file.
1280 Returns a `bytes` object, unless `return_memoryview` is True,
1281 in which case it returns a memory view, which may avoid an
1282 unnecessary data copy in some situations.
1285 if size < 0 or size is None:
1288 # specify exact=False, return_memoryview=True here so that we
1289 # only copy data once into the final buffer.
1291 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1294 self._filepos += len(rd)
1295 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1296 return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1298 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview)
1299 self._filepos += len(data)
1302 @_FileLikeObjectBase._before_close
1304 def readfrom(self, offset, size, num_retries=None, return_memoryview=False):
1305 """Read up to `size` bytes from the stream, starting at the specified file offset.
1307 This method does not change the file position.
1309 Returns a `bytes` object, unless `return_memoryview` is True,
1310 in which case it returns a memory view, which may avoid an
1311 unnecessary data copy in some situations.
1314 return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview)
1320 class ArvadosFileWriter(ArvadosFileReader):
1321 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1323 Be aware that this class is NOT thread safe as there is no locking around
1324 updating file pointer.
1328 def __init__(self, arvadosfile, mode, num_retries=None):
1329 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1330 self.arvadosfile.add_writer(self)
1335 @_FileLikeObjectBase._before_close
1337 def write(self, data, num_retries=None):
1338 if self.mode[0] == "a":
1339 self._filepos = self.size()
1340 self.arvadosfile.writeto(self._filepos, data, num_retries)
1341 self._filepos += len(data)
1344 @_FileLikeObjectBase._before_close
1346 def writelines(self, seq, num_retries=None):
1348 self.write(s, num_retries=num_retries)
1350 @_FileLikeObjectBase._before_close
1351 def truncate(self, size=None):
1353 size = self._filepos
1354 self.arvadosfile.truncate(size)
1356 @_FileLikeObjectBase._before_close
1358 self.arvadosfile.flush()
1360 def close(self, flush=True):
1362 self.arvadosfile.remove_writer(self, flush)
1363 super(ArvadosFileWriter, self).close()
1366 class WrappableFile(object):
1367 """An interface to an Arvados file that's compatible with io wrappers.
1370 def __init__(self, f):
1375 return self.f.close()
1377 return self.f.flush()
1378 def read(self, *args, **kwargs):
1379 return self.f.read(*args, **kwargs)
1381 return self.f.readable()
1382 def readinto(self, *args, **kwargs):
1383 return self.f.readinto(*args, **kwargs)
1384 def seek(self, *args, **kwargs):
1385 return self.f.seek(*args, **kwargs)
1387 return self.f.seekable()
1389 return self.f.tell()
1391 return self.f.writable()
1392 def write(self, *args, **kwargs):
1393 return self.f.write(*args, **kwargs)