1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 from future.utils import listitems, listvalues
9 standard_library.install_aliases()
10 from builtins import range
11 from builtins import object
28 from .errors import KeepWriteError, AssertionError, ArgumentError
29 from .keep import KeepLocator
30 from ._normalize_stream import normalize_stream
31 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
32 from .retry import retry_method
37 _logger = logging.getLogger('arvados.arvfile')
40 """split(path) -> streamname, filename
42 Separate the stream name and file name in a /-separated stream path and
43 return a tuple (stream_name, file_name). If no stream name is available,
48 stream_name, file_name = path.rsplit('/', 1)
49 except ValueError: # No / in string
50 stream_name, file_name = '.', path
51 return stream_name, file_name
54 class UnownedBlockError(Exception):
55 """Raised when there's an writable block without an owner on the BlockManager."""
59 class _FileLikeObjectBase(object):
60 def __init__(self, name, mode):
66 def _before_close(orig_func):
67 @functools.wraps(orig_func)
68 def before_close_wrapper(self, *args, **kwargs):
70 raise ValueError("I/O operation on closed stream file")
71 return orig_func(self, *args, **kwargs)
72 return before_close_wrapper
77 def __exit__(self, exc_type, exc_value, traceback):
88 class ArvadosFileReaderBase(_FileLikeObjectBase):
89 def __init__(self, name, mode, num_retries=None):
90 super(ArvadosFileReaderBase, self).__init__(name, mode)
92 self.num_retries = num_retries
93 self._readline_cache = (None, None)
97 data = self.readline()
102 def decompressed_name(self):
103 return re.sub(r'\.(bz2|gz)$', '', self.name)
105 @_FileLikeObjectBase._before_close
106 def seek(self, pos, whence=os.SEEK_SET):
107 if whence == os.SEEK_CUR:
109 elif whence == os.SEEK_END:
112 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
128 @_FileLikeObjectBase._before_close
130 def readall(self, size=2**20, num_retries=None):
132 data = self.read(size, num_retries=num_retries)
137 @_FileLikeObjectBase._before_close
139 def readline(self, size=float('inf'), num_retries=None):
140 cache_pos, cache_data = self._readline_cache
141 if self.tell() == cache_pos:
143 self._filepos += len(cache_data)
146 data_size = len(data[-1])
147 while (data_size < size) and (b'\n' not in data[-1]):
148 next_read = self.read(2 ** 20, num_retries=num_retries)
151 data.append(next_read)
152 data_size += len(next_read)
153 data = b''.join(data)
155 nextline_index = data.index(b'\n') + 1
157 nextline_index = len(data)
158 nextline_index = min(nextline_index, size)
159 self._filepos -= len(data) - nextline_index
160 self._readline_cache = (self.tell(), data[nextline_index:])
161 return data[:nextline_index].decode()
163 @_FileLikeObjectBase._before_close
165 def decompress(self, decompress, size, num_retries=None):
166 for segment in self.readall(size, num_retries=num_retries):
167 data = decompress(segment)
171 @_FileLikeObjectBase._before_close
173 def readall_decompressed(self, size=2**20, num_retries=None):
175 if self.name.endswith('.bz2'):
176 dc = bz2.BZ2Decompressor()
177 return self.decompress(dc.decompress, size,
178 num_retries=num_retries)
179 elif self.name.endswith('.gz'):
180 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
181 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
182 size, num_retries=num_retries)
184 return self.readall(size, num_retries=num_retries)
186 @_FileLikeObjectBase._before_close
188 def readlines(self, sizehint=float('inf'), num_retries=None):
191 for s in self.readall(num_retries=num_retries):
194 if data_size >= sizehint:
196 return b''.join(data).decode().splitlines(True)
199 raise IOError(errno.ENOSYS, "Not implemented")
201 def read(self, size, num_retries=None):
202 raise IOError(errno.ENOSYS, "Not implemented")
204 def readfrom(self, start, size, num_retries=None):
205 raise IOError(errno.ENOSYS, "Not implemented")
208 class StreamFileReader(ArvadosFileReaderBase):
209 class _NameAttribute(str):
210 # The Python file API provides a plain .name attribute.
211 # Older SDK provided a name() method.
212 # This class provides both, for maximum compatibility.
216 def __init__(self, stream, segments, name):
217 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
218 self._stream = stream
219 self.segments = segments
221 def stream_name(self):
222 return self._stream.name()
225 n = self.segments[-1]
226 return n.range_start + n.range_size
228 @_FileLikeObjectBase._before_close
230 def read(self, size, num_retries=None):
231 """Read up to 'size' bytes from the stream, starting at the current file position"""
236 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
238 lr = available_chunks[0]
239 data = self._stream.readfrom(lr.locator+lr.segment_offset,
241 num_retries=num_retries)
243 self._filepos += len(data)
246 @_FileLikeObjectBase._before_close
248 def readfrom(self, start, size, num_retries=None):
249 """Read up to 'size' bytes from the stream, starting at 'start'"""
254 for lr in locators_and_ranges(self.segments, start, size):
255 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
256 num_retries=num_retries))
257 return b''.join(data)
259 def as_manifest(self):
261 for r in self.segments:
262 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
263 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
266 def synchronized(orig_func):
267 @functools.wraps(orig_func)
268 def synchronized_wrapper(self, *args, **kwargs):
270 return orig_func(self, *args, **kwargs)
271 return synchronized_wrapper
274 class StateChangeError(Exception):
275 def __init__(self, message, state, nextstate):
276 super(StateChangeError, self).__init__(message)
278 self.nextstate = nextstate
280 class _BufferBlock(object):
281 """A stand-in for a Keep block that is in the process of being written.
283 Writers can append to it, get the size, and compute the Keep locator.
284 There are three valid states:
290 Block is in the process of being uploaded to Keep, append is an error.
293 The block has been written to Keep, its internal buffer has been
294 released, fetching the block will fetch it via keep client (since we
295 discarded the internal copy), and identifiers referring to the BufferBlock
296 can be replaced with the block locator.
306 def __init__(self, blockid, starting_capacity, owner):
309 the identifier for this block
312 the initial buffer capacity
315 ArvadosFile that owns this block
318 self.blockid = blockid
319 self.buffer_block = bytearray(starting_capacity)
320 self.buffer_view = memoryview(self.buffer_block)
321 self.write_pointer = 0
322 self._state = _BufferBlock.WRITABLE
325 self.lock = threading.Lock()
326 self.wait_for_commit = threading.Event()
330 def append(self, data):
331 """Append some data to the buffer.
333 Only valid if the block is in WRITABLE state. Implements an expanding
334 buffer, doubling capacity as needed to accomdate all the data.
337 if self._state == _BufferBlock.WRITABLE:
338 if not isinstance(data, bytes) and not isinstance(data, memoryview):
340 while (self.write_pointer+len(data)) > len(self.buffer_block):
341 new_buffer_block = bytearray(len(self.buffer_block) * 2)
342 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
343 self.buffer_block = new_buffer_block
344 self.buffer_view = memoryview(self.buffer_block)
345 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
346 self.write_pointer += len(data)
349 raise AssertionError("Buffer block is not writable")
351 STATE_TRANSITIONS = frozenset([
353 (PENDING, COMMITTED),
358 def set_state(self, nextstate, val=None):
359 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
360 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
361 self._state = nextstate
363 if self._state == _BufferBlock.PENDING:
364 self.wait_for_commit.clear()
366 if self._state == _BufferBlock.COMMITTED:
368 self.buffer_view = None
369 self.buffer_block = None
370 self.wait_for_commit.set()
372 if self._state == _BufferBlock.ERROR:
374 self.wait_for_commit.set()
381 """The amount of data written to the buffer."""
382 return self.write_pointer
386 """The Keep locator for this buffer's contents."""
387 if self._locator is None:
388 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
392 def clone(self, new_blockid, owner):
393 if self._state == _BufferBlock.COMMITTED:
394 raise AssertionError("Cannot duplicate committed buffer block")
395 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
396 bufferblock.append(self.buffer_view[0:self.size()])
401 self._state = _BufferBlock.DELETED
403 self.buffer_block = None
404 self.buffer_view = None
407 def repack_writes(self):
408 """Optimize buffer block by repacking segments in file sequence.
410 When the client makes random writes, they appear in the buffer block in
411 the sequence they were written rather than the sequence they appear in
412 the file. This makes for inefficient, fragmented manifests. Attempt
413 to optimize by repacking writes in file sequence.
416 if self._state != _BufferBlock.WRITABLE:
417 raise AssertionError("Cannot repack non-writable block")
419 segs = self.owner.segments()
421 # Collect the segments that reference the buffer block.
422 bufferblock_segs = [s for s in segs if s.locator == self.blockid]
424 # Collect total data referenced by segments (could be smaller than
425 # bufferblock size if a portion of the file was written and
427 write_total = sum([s.range_size for s in bufferblock_segs])
429 if write_total < self.size() or len(bufferblock_segs) > 1:
430 # If there's more than one segment referencing this block, it is
431 # due to out-of-order writes and will produce a fragmented
432 # manifest, so try to optimize by re-packing into a new buffer.
433 contents = self.buffer_view[0:self.write_pointer].tobytes()
434 new_bb = _BufferBlock(None, write_total, None)
435 for t in bufferblock_segs:
436 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
437 t.segment_offset = new_bb.size() - t.range_size
439 self.buffer_block = new_bb.buffer_block
440 self.buffer_view = new_bb.buffer_view
441 self.write_pointer = new_bb.write_pointer
444 self.owner.set_segments(segs)
447 return "<BufferBlock %s>" % (self.blockid)
450 class NoopLock(object):
454 def __exit__(self, exc_type, exc_value, traceback):
457 def acquire(self, blocking=False):
464 def must_be_writable(orig_func):
465 @functools.wraps(orig_func)
466 def must_be_writable_wrapper(self, *args, **kwargs):
467 if not self.writable():
468 raise IOError(errno.EROFS, "Collection is read-only.")
469 return orig_func(self, *args, **kwargs)
470 return must_be_writable_wrapper
473 class _BlockManager(object):
474 """BlockManager handles buffer blocks.
476 Also handles background block uploads, and background block prefetch for a
477 Collection of ArvadosFiles.
481 DEFAULT_PUT_THREADS = 2
483 def __init__(self, keep,
487 storage_classes_func=None):
488 """keep: KeepClient object to use"""
490 self._bufferblocks = collections.OrderedDict()
491 self._put_queue = None
492 self._put_threads = None
493 self.lock = threading.Lock()
494 self.prefetch_lookahead = self._keep.num_prefetch_threads
495 self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
497 self.storage_classes = storage_classes_func or (lambda: [])
498 self._pending_write_size = 0
499 self.threads_lock = threading.Lock()
500 self.padding_block = None
501 self.num_retries = num_retries
504 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
505 """Allocate a new, empty bufferblock in WRITABLE state and return it.
508 optional block identifier, otherwise one will be automatically assigned
511 optional capacity, otherwise will use default capacity
514 ArvadosFile that owns this block
517 return self._alloc_bufferblock(blockid, starting_capacity, owner)
519 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
521 blockid = str(uuid.uuid4())
522 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
523 self._bufferblocks[bufferblock.blockid] = bufferblock
527 def dup_block(self, block, owner):
528 """Create a new bufferblock initialized with the content of an existing bufferblock.
531 the buffer block to copy.
534 ArvadosFile that owns the new block
537 new_blockid = str(uuid.uuid4())
538 bufferblock = block.clone(new_blockid, owner)
539 self._bufferblocks[bufferblock.blockid] = bufferblock
543 def is_bufferblock(self, locator):
544 return locator in self._bufferblocks
546 def _commit_bufferblock_worker(self):
547 """Background uploader thread."""
551 bufferblock = self._put_queue.get()
552 if bufferblock is None:
555 if self.copies is None:
556 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
558 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
559 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
560 except Exception as e:
561 bufferblock.set_state(_BufferBlock.ERROR, e)
563 if self._put_queue is not None:
564 self._put_queue.task_done()
566 def start_put_threads(self):
567 with self.threads_lock:
568 if self._put_threads is None:
569 # Start uploader threads.
571 # If we don't limit the Queue size, the upload queue can quickly
572 # grow to take up gigabytes of RAM if the writing process is
573 # generating data more quickly than it can be sent to the Keep
576 # With two upload threads and a queue size of 2, this means up to 4
577 # blocks pending. If they are full 64 MiB blocks, that means up to
578 # 256 MiB of internal buffering, which is the same size as the
579 # default download block cache in KeepClient.
580 self._put_queue = queue.Queue(maxsize=2)
582 self._put_threads = []
583 for i in range(0, self.num_put_threads):
584 thread = threading.Thread(target=self._commit_bufferblock_worker)
585 self._put_threads.append(thread)
590 def stop_threads(self):
591 """Shut down and wait for background upload and download threads to finish."""
593 if self._put_threads is not None:
594 for t in self._put_threads:
595 self._put_queue.put(None)
596 for t in self._put_threads:
598 self._put_threads = None
599 self._put_queue = None
604 def __exit__(self, exc_type, exc_value, traceback):
608 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
609 """Packs small blocks together before uploading"""
611 self._pending_write_size += closed_file_size
613 # Check if there are enough small blocks for filling up one in full
614 if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
617 # Search blocks ready for getting packed together before being
619 # A WRITABLE block always has an owner.
620 # A WRITABLE block with its owner.closed() implies that its
621 # size is <= KEEP_BLOCK_SIZE/2.
623 small_blocks = [b for b in listvalues(self._bufferblocks)
624 if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
625 except AttributeError:
626 # Writable blocks without owner shouldn't exist.
627 raise UnownedBlockError()
629 if len(small_blocks) <= 1:
630 # Not enough small blocks for repacking
633 for bb in small_blocks:
636 # Update the pending write size count with its true value, just in case
637 # some small file was opened, written and closed several times.
638 self._pending_write_size = sum([b.size() for b in small_blocks])
640 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
643 new_bb = self._alloc_bufferblock()
646 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
647 bb = small_blocks.pop(0)
648 new_bb.owner.append(bb.owner)
649 self._pending_write_size -= bb.size()
650 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
651 files.append((bb, new_bb.write_pointer - bb.size()))
653 self.commit_bufferblock(new_bb, sync=sync)
655 for bb, new_bb_segment_offset in files:
656 newsegs = bb.owner.segments()
658 if s.locator == bb.blockid:
659 s.locator = new_bb.blockid
660 s.segment_offset = new_bb_segment_offset+s.segment_offset
661 bb.owner.set_segments(newsegs)
662 self._delete_bufferblock(bb.blockid)
664 def commit_bufferblock(self, block, sync):
665 """Initiate a background upload of a bufferblock.
668 The block object to upload
671 If `sync` is True, upload the block synchronously.
672 If `sync` is False, upload the block asynchronously. This will
673 return immediately unless the upload queue is at capacity, in
674 which case it will wait on an upload queue slot.
678 # Mark the block as PENDING so to disallow any more appends.
679 block.set_state(_BufferBlock.PENDING)
680 except StateChangeError as e:
681 if e.state == _BufferBlock.PENDING:
683 block.wait_for_commit.wait()
686 if block.state() == _BufferBlock.COMMITTED:
688 elif block.state() == _BufferBlock.ERROR:
695 if self.copies is None:
696 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
698 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
699 block.set_state(_BufferBlock.COMMITTED, loc)
700 except Exception as e:
701 block.set_state(_BufferBlock.ERROR, e)
704 self.start_put_threads()
705 self._put_queue.put(block)
708 def get_bufferblock(self, locator):
709 return self._bufferblocks.get(locator)
712 def get_padding_block(self):
713 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
714 when using truncate() to extend the size of a file.
716 For reference (and possible future optimization), the md5sum of the
717 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
721 if self.padding_block is None:
722 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
723 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
724 self.commit_bufferblock(self.padding_block, False)
725 return self.padding_block
728 def delete_bufferblock(self, locator):
729 self._delete_bufferblock(locator)
731 def _delete_bufferblock(self, locator):
732 if locator in self._bufferblocks:
733 bb = self._bufferblocks[locator]
735 del self._bufferblocks[locator]
737 def get_block_contents(self, locator, num_retries, cache_only=False):
740 First checks to see if the locator is a BufferBlock and return that, if
741 not, passes the request through to KeepClient.get().
745 if locator in self._bufferblocks:
746 bufferblock = self._bufferblocks[locator]
747 if bufferblock.state() != _BufferBlock.COMMITTED:
748 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
750 locator = bufferblock._locator
752 return self._keep.get_from_cache(locator)
754 return self._keep.get(locator, num_retries=num_retries)
756 def commit_all(self):
757 """Commit all outstanding buffer blocks.
759 This is a synchronous call, and will not return until all buffer blocks
760 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
763 self.repack_small_blocks(force=True, sync=True)
766 items = listitems(self._bufferblocks)
769 if v.state() != _BufferBlock.COMMITTED and v.owner:
770 # Ignore blocks with a list of owners, as if they're not in COMMITTED
771 # state, they're already being committed asynchronously.
772 if isinstance(v.owner, ArvadosFile):
773 v.owner.flush(sync=False)
776 if self._put_queue is not None:
777 self._put_queue.join()
781 if v.state() == _BufferBlock.ERROR:
782 err.append((v.locator(), v.error))
784 raise KeepWriteError("Error writing some blocks", err, label="block")
787 # flush again with sync=True to remove committed bufferblocks from
790 if isinstance(v.owner, ArvadosFile):
791 v.owner.flush(sync=True)
792 elif isinstance(v.owner, list) and len(v.owner) > 0:
793 # This bufferblock is referenced by many files as a result
794 # of repacking small blocks, so don't delete it when flushing
795 # its owners, just do it after flushing them all.
796 for owner in v.owner:
797 owner.flush(sync=True)
798 self.delete_bufferblock(k)
802 def block_prefetch(self, locator):
803 """Initiate a background download of a block.
806 if not self.prefetch_lookahead:
810 if locator in self._bufferblocks:
813 self._keep.block_prefetch(locator)
816 class ArvadosFile(object):
817 """Represent a file in a Collection.
819 ArvadosFile manages the underlying representation of a file in Keep as a
820 sequence of segments spanning a set of blocks, and implements random
823 This object may be accessed from multiple threads.
827 __slots__ = ('parent', 'name', '_writers', '_committed',
828 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
830 def __init__(self, parent, name, stream=[], segments=[]):
832 ArvadosFile constructor.
835 a list of Range objects representing a block stream
838 a list of Range objects representing segments
842 self._writers = set()
843 self._committed = False
845 self.lock = parent.root_collection().lock
847 self._add_segment(stream, s.locator, s.range_size)
848 self._current_bblock = None
849 self._read_counter = 0
852 return self.parent.writable()
855 def permission_expired(self, as_of_dt=None):
856 """Returns True if any of the segment's locators is expired"""
857 for r in self._segments:
858 if KeepLocator(r.locator).permission_expired(as_of_dt):
863 def has_remote_blocks(self):
864 """Returns True if any of the segment's locators has a +R signature"""
866 for s in self._segments:
867 if '+R' in s.locator:
872 def _copy_remote_blocks(self, remote_blocks={}):
873 """Ask Keep to copy remote blocks and point to their local copies.
875 This is called from the parent Collection.
878 Shared cache of remote to local block mappings. This is used to avoid
879 doing extra work when blocks are shared by more than one file in
880 different subdirectories.
883 for s in self._segments:
884 if '+R' in s.locator:
886 loc = remote_blocks[s.locator]
888 loc = self.parent._my_keep().refresh_signature(s.locator)
889 remote_blocks[s.locator] = loc
891 self.parent.set_committed(False)
896 return copy.copy(self._segments)
899 def clone(self, new_parent, new_name):
900 """Make a copy of this file."""
901 cp = ArvadosFile(new_parent, new_name)
902 cp.replace_contents(self)
907 def replace_contents(self, other):
908 """Replace segments of this file with segments from another `ArvadosFile` object."""
912 for other_segment in other.segments():
913 new_loc = other_segment.locator
914 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
915 if other_segment.locator not in map_loc:
916 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
917 if bufferblock.state() != _BufferBlock.WRITABLE:
918 map_loc[other_segment.locator] = bufferblock.locator()
920 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
921 new_loc = map_loc[other_segment.locator]
923 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
925 self.set_committed(False)
927 def __eq__(self, other):
930 if not isinstance(other, ArvadosFile):
933 othersegs = other.segments()
935 if len(self._segments) != len(othersegs):
937 for i in range(0, len(othersegs)):
938 seg1 = self._segments[i]
943 if self.parent._my_block_manager().is_bufferblock(loc1):
944 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
946 if other.parent._my_block_manager().is_bufferblock(loc2):
947 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
949 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
950 seg1.range_start != seg2.range_start or
951 seg1.range_size != seg2.range_size or
952 seg1.segment_offset != seg2.segment_offset):
957 def __ne__(self, other):
958 return not self.__eq__(other)
961 def set_segments(self, segs):
962 self._segments = segs
965 def set_committed(self, value=True):
966 """Set committed flag.
968 If value is True, set committed to be True.
970 If value is False, set committed to be False for this and all parents.
972 if value == self._committed:
974 self._committed = value
975 if self._committed is False and self.parent is not None:
976 self.parent.set_committed(False)
980 """Get whether this is committed or not."""
981 return self._committed
984 def add_writer(self, writer):
985 """Add an ArvadosFileWriter reference to the list of writers"""
986 if isinstance(writer, ArvadosFileWriter):
987 self._writers.add(writer)
990 def remove_writer(self, writer, flush):
992 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
993 and do some block maintenance tasks.
995 self._writers.remove(writer)
997 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
998 # File writer closed, not small enough for repacking
1001 # All writers closed and size is adequate for repacking
1002 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
1006 Get whether this is closed or not. When the writers list is empty, the file
1007 is supposed to be closed.
1009 return len(self._writers) == 0
1013 def truncate(self, size):
1014 """Shrink or expand the size of the file.
1016 If `size` is less than the size of the file, the file contents after
1017 `size` will be discarded. If `size` is greater than the current size
1018 of the file, it will be filled with zero bytes.
1021 if size < self.size():
1023 for r in self._segments:
1024 range_end = r.range_start+r.range_size
1025 if r.range_start >= size:
1026 # segment is past the trucate size, all done
1028 elif size < range_end:
1029 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1030 nr.segment_offset = r.segment_offset
1036 self._segments = new_segs
1037 self.set_committed(False)
1038 elif size > self.size():
1039 padding = self.parent._my_block_manager().get_padding_block()
1040 diff = size - self.size()
1041 while diff > config.KEEP_BLOCK_SIZE:
1042 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1043 diff -= config.KEEP_BLOCK_SIZE
1045 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1046 self.set_committed(False)
1048 # size == self.size()
1051 def readfrom(self, offset, size, num_retries, exact=False):
1052 """Read up to `size` bytes from the file starting at `offset`.
1055 If False (default), return less data than requested if the read
1056 crosses a block boundary and the next block isn't cached. If True,
1057 only return less data than requested when hitting EOF.
1061 if size == 0 or offset >= self.size():
1063 readsegs = locators_and_ranges(self._segments, offset, size)
1066 prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1067 if prefetch_lookahead:
1068 # Doing prefetch on every read() call is surprisingly expensive
1069 # when we're trying to deliver data at 600+ MiBps and want
1070 # the read() fast path to be as lightweight as possible.
1072 # Only prefetching every 128 read operations
1073 # dramatically reduces the overhead while still
1074 # getting the benefit of prefetching (e.g. when
1075 # reading 128 KiB at a time, it checks for prefetch
1077 self._read_counter = (self._read_counter+1) % 128
1078 if self._read_counter == 1:
1079 prefetch = locators_and_ranges(self._segments,
1081 config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1082 limit=(1+prefetch_lookahead))
1087 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1089 blockview = memoryview(block)
1090 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1091 locs.add(lr.locator)
1097 if lr.locator not in locs:
1098 self.parent._my_block_manager().block_prefetch(lr.locator)
1099 locs.add(lr.locator)
1104 return b''.join(data)
1108 def writeto(self, offset, data, num_retries):
1109 """Write `data` to the file starting at `offset`.
1111 This will update existing bytes and/or extend the size of the file as
1115 if not isinstance(data, bytes) and not isinstance(data, memoryview):
1116 data = data.encode()
1120 if offset > self.size():
1121 self.truncate(offset)
1123 if len(data) > config.KEEP_BLOCK_SIZE:
1124 # Chunk it up into smaller writes
1126 dataview = memoryview(data)
1127 while n < len(data):
1128 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1129 n += config.KEEP_BLOCK_SIZE
1132 self.set_committed(False)
1134 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1135 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1137 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1138 self._current_bblock.repack_writes()
1139 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1140 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1141 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1143 self._current_bblock.append(data)
1145 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1147 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1152 def flush(self, sync=True, num_retries=0):
1153 """Flush the current bufferblock to Keep.
1156 If True, commit block synchronously, wait until buffer block has been written.
1157 If False, commit block asynchronously, return immediately after putting block into
1160 if self.committed():
1163 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1164 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1165 self._current_bblock.repack_writes()
1166 if self._current_bblock.state() != _BufferBlock.DELETED:
1167 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1171 for s in self._segments:
1172 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1174 if bb.state() != _BufferBlock.COMMITTED:
1175 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1176 to_delete.add(s.locator)
1177 s.locator = bb.locator()
1179 # Don't delete the bufferblock if it's owned by many files. It'll be
1180 # deleted after all of its owners are flush()ed.
1181 if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1182 self.parent._my_block_manager().delete_bufferblock(s)
1184 self.parent.notify(MOD, self.parent, self.name, (self, self))
1188 def add_segment(self, blocks, pos, size):
1189 """Add a segment to the end of the file.
1191 `pos` and `offset` reference a section of the stream described by
1192 `blocks` (a list of Range objects)
1195 self._add_segment(blocks, pos, size)
1197 def _add_segment(self, blocks, pos, size):
1198 """Internal implementation of add_segment."""
1199 self.set_committed(False)
1200 for lr in locators_and_ranges(blocks, pos, size):
1201 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1202 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1203 self._segments.append(r)
1207 """Get the file size."""
1209 n = self._segments[-1]
1210 return n.range_start + n.range_size
1215 def manifest_text(self, stream_name=".", portable_locators=False,
1216 normalize=False, only_committed=False):
1219 for segment in self._segments:
1220 loc = segment.locator
1221 if self.parent._my_block_manager().is_bufferblock(loc):
1224 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1225 if portable_locators:
1226 loc = KeepLocator(loc).stripped()
1227 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1228 segment.segment_offset, segment.range_size))
1229 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1235 def _reparent(self, newparent, newname):
1236 self.set_committed(False)
1237 self.flush(sync=True)
1238 self.parent.remove(self.name)
1239 self.parent = newparent
1241 self.lock = self.parent.root_collection().lock
1244 class ArvadosFileReader(ArvadosFileReaderBase):
1245 """Wraps ArvadosFile in a file-like object supporting reading only.
1247 Be aware that this class is NOT thread safe as there is no locking around
1248 updating file pointer.
1252 def __init__(self, arvadosfile, mode="r", num_retries=None):
1253 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1254 self.arvadosfile = arvadosfile
1257 return self.arvadosfile.size()
1259 def stream_name(self):
1260 return self.arvadosfile.parent.stream_name()
1262 def readinto(self, b):
1263 data = self.read(len(b))
1264 b[:len(data)] = data
1267 @_FileLikeObjectBase._before_close
1269 def read(self, size=None, num_retries=None):
1270 """Read up to `size` bytes from the file and return the result.
1272 Starts at the current file position. If `size` is None, read the
1273 entire remainder of the file.
1277 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1280 self._filepos += len(rd)
1281 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1282 return b''.join(data)
1284 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1285 self._filepos += len(data)
1288 @_FileLikeObjectBase._before_close
1290 def readfrom(self, offset, size, num_retries=None):
1291 """Read up to `size` bytes from the stream, starting at the specified file offset.
1293 This method does not change the file position.
1295 return self.arvadosfile.readfrom(offset, size, num_retries)
1301 class ArvadosFileWriter(ArvadosFileReader):
1302 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1304 Be aware that this class is NOT thread safe as there is no locking around
1305 updating file pointer.
1309 def __init__(self, arvadosfile, mode, num_retries=None):
1310 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1311 self.arvadosfile.add_writer(self)
1316 @_FileLikeObjectBase._before_close
1318 def write(self, data, num_retries=None):
1319 if self.mode[0] == "a":
1320 self._filepos = self.size()
1321 self.arvadosfile.writeto(self._filepos, data, num_retries)
1322 self._filepos += len(data)
1325 @_FileLikeObjectBase._before_close
1327 def writelines(self, seq, num_retries=None):
1329 self.write(s, num_retries=num_retries)
1331 @_FileLikeObjectBase._before_close
1332 def truncate(self, size=None):
1334 size = self._filepos
1335 self.arvadosfile.truncate(size)
1337 @_FileLikeObjectBase._before_close
1339 self.arvadosfile.flush()
1341 def close(self, flush=True):
1343 self.arvadosfile.remove_writer(self, flush)
1344 super(ArvadosFileWriter, self).close()
1347 class WrappableFile(object):
1348 """An interface to an Arvados file that's compatible with io wrappers.
1351 def __init__(self, f):
1356 return self.f.close()
1358 return self.f.flush()
1359 def read(self, *args, **kwargs):
1360 return self.f.read(*args, **kwargs)
1362 return self.f.readable()
1363 def readinto(self, *args, **kwargs):
1364 return self.f.readinto(*args, **kwargs)
1365 def seek(self, *args, **kwargs):
1366 return self.f.seek(*args, **kwargs)
1368 return self.f.seekable()
1370 return self.f.tell()
1372 return self.f.writable()
1373 def write(self, *args, **kwargs):
1374 return self.f.write(*args, **kwargs)