1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
21 from ._internal import streams
22 from .errors import KeepWriteError, AssertionError, ArgumentError
23 from .keep import KeepLocator
24 from .retry import retry_method
29 _logger = logging.getLogger('arvados.arvfile')
32 """split(path) -> streamname, filename
34 Separate the stream name and file name in a /-separated stream path and
35 return a tuple (stream_name, file_name). If no stream name is available,
40 stream_name, file_name = path.rsplit('/', 1)
41 except ValueError: # No / in string
42 stream_name, file_name = '.', path
43 return stream_name, file_name
46 class UnownedBlockError(Exception):
47 """Raised when there's an writable block without an owner on the BlockManager."""
51 class _FileLikeObjectBase(object):
52 def __init__(self, name, mode):
58 def _before_close(orig_func):
59 @functools.wraps(orig_func)
60 def before_close_wrapper(self, *args, **kwargs):
62 raise ValueError("I/O operation on closed stream file")
63 return orig_func(self, *args, **kwargs)
64 return before_close_wrapper
69 def __exit__(self, exc_type, exc_value, traceback):
80 class ArvadosFileReaderBase(_FileLikeObjectBase):
81 def __init__(self, name, mode, num_retries=None):
82 super(ArvadosFileReaderBase, self).__init__(name, mode)
84 self.num_retries = num_retries
85 self._readline_cache = (None, None)
89 data = self.readline()
94 def decompressed_name(self):
95 return re.sub(r'\.(bz2|gz)$', '', self.name)
97 @_FileLikeObjectBase._before_close
98 def seek(self, pos, whence=os.SEEK_SET):
99 if whence == os.SEEK_CUR:
101 elif whence == os.SEEK_END:
104 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
120 @_FileLikeObjectBase._before_close
122 def readall(self, size=2**20, num_retries=None):
124 data = self.read(size, num_retries=num_retries)
129 @_FileLikeObjectBase._before_close
131 def readline(self, size=float('inf'), num_retries=None):
132 cache_pos, cache_data = self._readline_cache
133 if self.tell() == cache_pos:
135 self._filepos += len(cache_data)
138 data_size = len(data[-1])
139 while (data_size < size) and (b'\n' not in data[-1]):
140 next_read = self.read(2 ** 20, num_retries=num_retries)
143 data.append(next_read)
144 data_size += len(next_read)
145 data = b''.join(data)
147 nextline_index = data.index(b'\n') + 1
149 nextline_index = len(data)
150 nextline_index = min(nextline_index, size)
151 self._filepos -= len(data) - nextline_index
152 self._readline_cache = (self.tell(), data[nextline_index:])
153 return data[:nextline_index].decode()
155 @_FileLikeObjectBase._before_close
157 def decompress(self, decompress, size, num_retries=None):
158 for segment in self.readall(size, num_retries=num_retries):
159 data = decompress(segment)
163 @_FileLikeObjectBase._before_close
165 def readall_decompressed(self, size=2**20, num_retries=None):
167 if self.name.endswith('.bz2'):
168 dc = bz2.BZ2Decompressor()
169 return self.decompress(dc.decompress, size,
170 num_retries=num_retries)
171 elif self.name.endswith('.gz'):
172 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
173 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
174 size, num_retries=num_retries)
176 return self.readall(size, num_retries=num_retries)
178 @_FileLikeObjectBase._before_close
180 def readlines(self, sizehint=float('inf'), num_retries=None):
183 for s in self.readall(num_retries=num_retries):
186 if data_size >= sizehint:
188 return b''.join(data).decode().splitlines(True)
191 raise IOError(errno.ENOSYS, "Not implemented")
193 def read(self, size, num_retries=None):
194 raise IOError(errno.ENOSYS, "Not implemented")
196 def readfrom(self, start, size, num_retries=None):
197 raise IOError(errno.ENOSYS, "Not implemented")
200 def synchronized(orig_func):
201 @functools.wraps(orig_func)
202 def synchronized_wrapper(self, *args, **kwargs):
204 return orig_func(self, *args, **kwargs)
205 return synchronized_wrapper
208 class StateChangeError(Exception):
209 def __init__(self, message, state, nextstate):
210 super(StateChangeError, self).__init__(message)
212 self.nextstate = nextstate
214 class _BufferBlock(object):
215 """A stand-in for a Keep block that is in the process of being written.
217 Writers can append to it, get the size, and compute the Keep locator.
218 There are three valid states:
224 Block is in the process of being uploaded to Keep, append is an error.
227 The block has been written to Keep, its internal buffer has been
228 released, fetching the block will fetch it via keep client (since we
229 discarded the internal copy), and identifiers referring to the BufferBlock
230 can be replaced with the block locator.
240 def __init__(self, blockid, starting_capacity, owner):
243 the identifier for this block
246 the initial buffer capacity
249 ArvadosFile that owns this block
252 self.blockid = blockid
253 self.buffer_block = bytearray(starting_capacity)
254 self.buffer_view = memoryview(self.buffer_block)
255 self.write_pointer = 0
256 self._state = _BufferBlock.WRITABLE
259 self.lock = threading.Lock()
260 self.wait_for_commit = threading.Event()
264 def append(self, data):
265 """Append some data to the buffer.
267 Only valid if the block is in WRITABLE state. Implements an expanding
268 buffer, doubling capacity as needed to accomdate all the data.
271 if self._state == _BufferBlock.WRITABLE:
272 if not isinstance(data, bytes) and not isinstance(data, memoryview):
274 while (self.write_pointer+len(data)) > len(self.buffer_block):
275 new_buffer_block = bytearray(len(self.buffer_block) * 2)
276 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
277 self.buffer_block = new_buffer_block
278 self.buffer_view = memoryview(self.buffer_block)
279 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
280 self.write_pointer += len(data)
283 raise AssertionError("Buffer block is not writable")
285 STATE_TRANSITIONS = frozenset([
287 (PENDING, COMMITTED),
292 def set_state(self, nextstate, val=None):
293 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
294 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
295 self._state = nextstate
297 if self._state == _BufferBlock.PENDING:
298 self.wait_for_commit.clear()
300 if self._state == _BufferBlock.COMMITTED:
302 self.buffer_view = None
303 self.buffer_block = None
304 self.wait_for_commit.set()
306 if self._state == _BufferBlock.ERROR:
308 self.wait_for_commit.set()
315 """The amount of data written to the buffer."""
316 return self.write_pointer
320 """The Keep locator for this buffer's contents."""
321 if self._locator is None:
322 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
326 def clone(self, new_blockid, owner):
327 if self._state == _BufferBlock.COMMITTED:
328 raise AssertionError("Cannot duplicate committed buffer block")
329 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
330 bufferblock.append(self.buffer_view[0:self.size()])
335 self._state = _BufferBlock.DELETED
337 self.buffer_block = None
338 self.buffer_view = None
341 def repack_writes(self):
342 """Optimize buffer block by repacking segments in file sequence.
344 When the client makes random writes, they appear in the buffer block in
345 the sequence they were written rather than the sequence they appear in
346 the file. This makes for inefficient, fragmented manifests. Attempt
347 to optimize by repacking writes in file sequence.
350 if self._state != _BufferBlock.WRITABLE:
351 raise AssertionError("Cannot repack non-writable block")
353 segs = self.owner.segments()
355 # Collect the segments that reference the buffer block.
356 bufferblock_segs = [s for s in segs if s.locator == self.blockid]
358 # Collect total data referenced by segments (could be smaller than
359 # bufferblock size if a portion of the file was written and
361 write_total = sum([s.range_size for s in bufferblock_segs])
363 if write_total < self.size() or len(bufferblock_segs) > 1:
364 # If there's more than one segment referencing this block, it is
365 # due to out-of-order writes and will produce a fragmented
366 # manifest, so try to optimize by re-packing into a new buffer.
367 contents = self.buffer_view[0:self.write_pointer].tobytes()
368 new_bb = _BufferBlock(None, write_total, None)
369 for t in bufferblock_segs:
370 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
371 t.segment_offset = new_bb.size() - t.range_size
373 self.buffer_block = new_bb.buffer_block
374 self.buffer_view = new_bb.buffer_view
375 self.write_pointer = new_bb.write_pointer
378 self.owner.set_segments(segs)
381 return "<BufferBlock %s>" % (self.blockid)
384 class NoopLock(object):
388 def __exit__(self, exc_type, exc_value, traceback):
391 def acquire(self, blocking=False):
398 def must_be_writable(orig_func):
399 @functools.wraps(orig_func)
400 def must_be_writable_wrapper(self, *args, **kwargs):
401 if not self.writable():
402 raise IOError(errno.EROFS, "Collection is read-only.")
403 return orig_func(self, *args, **kwargs)
404 return must_be_writable_wrapper
407 class _BlockManager(object):
408 """BlockManager handles buffer blocks.
410 Also handles background block uploads, and background block prefetch for a
411 Collection of ArvadosFiles.
415 DEFAULT_PUT_THREADS = 2
417 def __init__(self, keep,
421 storage_classes_func=None):
422 """keep: KeepClient object to use"""
424 self._bufferblocks = collections.OrderedDict()
425 self._put_queue = None
426 self._put_threads = None
427 self.lock = threading.Lock()
428 self.prefetch_lookahead = self._keep.num_prefetch_threads
429 self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
431 self.storage_classes = storage_classes_func or (lambda: [])
432 self._pending_write_size = 0
433 self.threads_lock = threading.Lock()
434 self.padding_block = None
435 self.num_retries = num_retries
438 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
439 """Allocate a new, empty bufferblock in WRITABLE state and return it.
442 optional block identifier, otherwise one will be automatically assigned
445 optional capacity, otherwise will use default capacity
448 ArvadosFile that owns this block
451 return self._alloc_bufferblock(blockid, starting_capacity, owner)
453 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
455 blockid = str(uuid.uuid4())
456 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
457 self._bufferblocks[bufferblock.blockid] = bufferblock
461 def dup_block(self, block, owner):
462 """Create a new bufferblock initialized with the content of an existing bufferblock.
465 the buffer block to copy.
468 ArvadosFile that owns the new block
471 new_blockid = str(uuid.uuid4())
472 bufferblock = block.clone(new_blockid, owner)
473 self._bufferblocks[bufferblock.blockid] = bufferblock
477 def is_bufferblock(self, locator):
478 return locator in self._bufferblocks
480 def _commit_bufferblock_worker(self):
481 """Background uploader thread."""
485 bufferblock = self._put_queue.get()
486 if bufferblock is None:
489 if self.copies is None:
490 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
492 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
493 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
494 except Exception as e:
495 bufferblock.set_state(_BufferBlock.ERROR, e)
497 if self._put_queue is not None:
498 self._put_queue.task_done()
500 def start_put_threads(self):
501 with self.threads_lock:
502 if self._put_threads is None:
503 # Start uploader threads.
505 # If we don't limit the Queue size, the upload queue can quickly
506 # grow to take up gigabytes of RAM if the writing process is
507 # generating data more quickly than it can be sent to the Keep
510 # With two upload threads and a queue size of 2, this means up to 4
511 # blocks pending. If they are full 64 MiB blocks, that means up to
512 # 256 MiB of internal buffering, which is the same size as the
513 # default download block cache in KeepClient.
514 self._put_queue = queue.Queue(maxsize=2)
516 self._put_threads = []
517 for i in range(0, self.num_put_threads):
518 thread = threading.Thread(target=self._commit_bufferblock_worker)
519 self._put_threads.append(thread)
524 def stop_threads(self):
525 """Shut down and wait for background upload and download threads to finish."""
527 if self._put_threads is not None:
528 for t in self._put_threads:
529 self._put_queue.put(None)
530 for t in self._put_threads:
532 self._put_threads = None
533 self._put_queue = None
538 def __exit__(self, exc_type, exc_value, traceback):
542 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
543 """Packs small blocks together before uploading"""
545 self._pending_write_size += closed_file_size
547 # Check if there are enough small blocks for filling up one in full
548 if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
551 # Search blocks ready for getting packed together before being
553 # A WRITABLE block always has an owner.
554 # A WRITABLE block with its owner.closed() implies that its
555 # size is <= KEEP_BLOCK_SIZE/2.
557 small_blocks = [b for b in self._bufferblocks.values()
558 if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
559 except AttributeError:
560 # Writable blocks without owner shouldn't exist.
561 raise UnownedBlockError()
563 if len(small_blocks) <= 1:
564 # Not enough small blocks for repacking
567 for bb in small_blocks:
570 # Update the pending write size count with its true value, just in case
571 # some small file was opened, written and closed several times.
572 self._pending_write_size = sum([b.size() for b in small_blocks])
574 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
577 new_bb = self._alloc_bufferblock()
580 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
581 bb = small_blocks.pop(0)
582 new_bb.owner.append(bb.owner)
583 self._pending_write_size -= bb.size()
584 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
585 files.append((bb, new_bb.write_pointer - bb.size()))
587 self.commit_bufferblock(new_bb, sync=sync)
589 for bb, new_bb_segment_offset in files:
590 newsegs = bb.owner.segments()
592 if s.locator == bb.blockid:
593 s.locator = new_bb.blockid
594 s.segment_offset = new_bb_segment_offset+s.segment_offset
595 bb.owner.set_segments(newsegs)
596 self._delete_bufferblock(bb.blockid)
598 def commit_bufferblock(self, block, sync):
599 """Initiate a background upload of a bufferblock.
602 The block object to upload
605 If `sync` is True, upload the block synchronously.
606 If `sync` is False, upload the block asynchronously. This will
607 return immediately unless the upload queue is at capacity, in
608 which case it will wait on an upload queue slot.
612 # Mark the block as PENDING so to disallow any more appends.
613 block.set_state(_BufferBlock.PENDING)
614 except StateChangeError as e:
615 if e.state == _BufferBlock.PENDING:
617 block.wait_for_commit.wait()
620 if block.state() == _BufferBlock.COMMITTED:
622 elif block.state() == _BufferBlock.ERROR:
629 if self.copies is None:
630 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
632 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
633 block.set_state(_BufferBlock.COMMITTED, loc)
634 except Exception as e:
635 block.set_state(_BufferBlock.ERROR, e)
638 self.start_put_threads()
639 self._put_queue.put(block)
642 def get_bufferblock(self, locator):
643 return self._bufferblocks.get(locator)
646 def get_padding_block(self):
647 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
648 when using truncate() to extend the size of a file.
650 For reference (and possible future optimization), the md5sum of the
651 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
655 if self.padding_block is None:
656 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
657 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
658 self.commit_bufferblock(self.padding_block, False)
659 return self.padding_block
662 def delete_bufferblock(self, locator):
663 self._delete_bufferblock(locator)
665 def _delete_bufferblock(self, locator):
666 if locator in self._bufferblocks:
667 bb = self._bufferblocks[locator]
669 del self._bufferblocks[locator]
671 def get_block_contents(self, locator, num_retries, cache_only=False):
674 First checks to see if the locator is a BufferBlock and return that, if
675 not, passes the request through to KeepClient.get().
679 if locator in self._bufferblocks:
680 bufferblock = self._bufferblocks[locator]
681 if bufferblock.state() != _BufferBlock.COMMITTED:
682 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
684 locator = bufferblock._locator
686 return self._keep.get_from_cache(locator)
688 return self._keep.get(locator, num_retries=num_retries)
690 def commit_all(self):
691 """Commit all outstanding buffer blocks.
693 This is a synchronous call, and will not return until all buffer blocks
694 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
697 self.repack_small_blocks(force=True, sync=True)
700 items = list(self._bufferblocks.items())
703 if v.state() != _BufferBlock.COMMITTED and v.owner:
704 # Ignore blocks with a list of owners, as if they're not in COMMITTED
705 # state, they're already being committed asynchronously.
706 if isinstance(v.owner, ArvadosFile):
707 v.owner.flush(sync=False)
710 if self._put_queue is not None:
711 self._put_queue.join()
715 if v.state() == _BufferBlock.ERROR:
716 err.append((v.locator(), v.error))
718 raise KeepWriteError("Error writing some blocks", err, label="block")
721 # flush again with sync=True to remove committed bufferblocks from
724 if isinstance(v.owner, ArvadosFile):
725 v.owner.flush(sync=True)
726 elif isinstance(v.owner, list) and len(v.owner) > 0:
727 # This bufferblock is referenced by many files as a result
728 # of repacking small blocks, so don't delete it when flushing
729 # its owners, just do it after flushing them all.
730 for owner in v.owner:
731 owner.flush(sync=True)
732 self.delete_bufferblock(k)
736 def block_prefetch(self, locator):
737 """Initiate a background download of a block.
740 if not self.prefetch_lookahead:
744 if locator in self._bufferblocks:
747 self._keep.block_prefetch(locator)
750 class ArvadosFile(object):
751 """Represent a file in a Collection.
753 ArvadosFile manages the underlying representation of a file in Keep as a
754 sequence of segments spanning a set of blocks, and implements random
757 This object may be accessed from multiple threads.
761 __slots__ = ('parent', 'name', '_writers', '_committed',
762 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
764 def __init__(self, parent, name, stream=[], segments=[]):
766 ArvadosFile constructor.
769 a list of Range objects representing a block stream
772 a list of Range objects representing segments
776 self._writers = set()
777 self._committed = False
779 self.lock = parent.root_collection().lock
781 self._add_segment(stream, s.locator, s.range_size)
782 self._current_bblock = None
783 self._read_counter = 0
786 return self.parent.writable()
789 def permission_expired(self, as_of_dt=None):
790 """Returns True if any of the segment's locators is expired"""
791 for r in self._segments:
792 if KeepLocator(r.locator).permission_expired(as_of_dt):
797 def has_remote_blocks(self):
798 """Returns True if any of the segment's locators has a +R signature"""
800 for s in self._segments:
801 if '+R' in s.locator:
806 def _copy_remote_blocks(self, remote_blocks={}):
807 """Ask Keep to copy remote blocks and point to their local copies.
809 This is called from the parent Collection.
812 Shared cache of remote to local block mappings. This is used to avoid
813 doing extra work when blocks are shared by more than one file in
814 different subdirectories.
817 for s in self._segments:
818 if '+R' in s.locator:
820 loc = remote_blocks[s.locator]
822 loc = self.parent._my_keep().refresh_signature(s.locator)
823 remote_blocks[s.locator] = loc
825 self.parent.set_committed(False)
830 return copy.copy(self._segments)
833 def clone(self, new_parent, new_name):
834 """Make a copy of this file."""
835 cp = ArvadosFile(new_parent, new_name)
836 cp.replace_contents(self)
841 def replace_contents(self, other):
842 """Replace segments of this file with segments from another `ArvadosFile` object."""
846 for other_segment in other.segments():
847 new_loc = other_segment.locator
848 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
849 if other_segment.locator not in map_loc:
850 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
851 if bufferblock.state() != _BufferBlock.WRITABLE:
852 map_loc[other_segment.locator] = bufferblock.locator()
854 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
855 new_loc = map_loc[other_segment.locator]
857 self._segments.append(streams.Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
859 self.set_committed(False)
861 def __eq__(self, other):
864 if not isinstance(other, ArvadosFile):
867 othersegs = other.segments()
869 if len(self._segments) != len(othersegs):
871 for i in range(0, len(othersegs)):
872 seg1 = self._segments[i]
877 if self.parent._my_block_manager().is_bufferblock(loc1):
878 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
880 if other.parent._my_block_manager().is_bufferblock(loc2):
881 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
883 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
884 seg1.range_start != seg2.range_start or
885 seg1.range_size != seg2.range_size or
886 seg1.segment_offset != seg2.segment_offset):
891 def __ne__(self, other):
892 return not self.__eq__(other)
895 def set_segments(self, segs):
896 self._segments = segs
899 def set_committed(self, value=True):
900 """Set committed flag.
902 If value is True, set committed to be True.
904 If value is False, set committed to be False for this and all parents.
906 if value == self._committed:
908 self._committed = value
909 if self._committed is False and self.parent is not None:
910 self.parent.set_committed(False)
914 """Get whether this is committed or not."""
915 return self._committed
918 def add_writer(self, writer):
919 """Add an ArvadosFileWriter reference to the list of writers"""
920 if isinstance(writer, ArvadosFileWriter):
921 self._writers.add(writer)
924 def remove_writer(self, writer, flush):
926 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
927 and do some block maintenance tasks.
929 self._writers.remove(writer)
931 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
932 # File writer closed, not small enough for repacking
935 # All writers closed and size is adequate for repacking
936 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
940 Get whether this is closed or not. When the writers list is empty, the file
941 is supposed to be closed.
943 return len(self._writers) == 0
947 def truncate(self, size):
948 """Shrink or expand the size of the file.
950 If `size` is less than the size of the file, the file contents after
951 `size` will be discarded. If `size` is greater than the current size
952 of the file, it will be filled with zero bytes.
955 if size < self.size():
957 for r in self._segments:
958 range_end = r.range_start+r.range_size
959 if r.range_start >= size:
960 # segment is past the trucate size, all done
962 elif size < range_end:
963 nr = streams.Range(r.locator, r.range_start, size - r.range_start, 0)
964 nr.segment_offset = r.segment_offset
970 self._segments = new_segs
971 self.set_committed(False)
972 elif size > self.size():
973 padding = self.parent._my_block_manager().get_padding_block()
974 diff = size - self.size()
975 while diff > config.KEEP_BLOCK_SIZE:
976 self._segments.append(streams.Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
977 diff -= config.KEEP_BLOCK_SIZE
979 self._segments.append(streams.Range(padding.blockid, self.size(), diff, 0))
980 self.set_committed(False)
982 # size == self.size()
985 def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False):
986 """Read up to `size` bytes from the file starting at `offset`.
990 * exact: bool --- If False (default), return less data than
991 requested if the read crosses a block boundary and the next
992 block isn't cached. If True, only return less data than
993 requested when hitting EOF.
995 * return_memoryview: bool --- If False (default) return a
996 `bytes` object, which may entail making a copy in some
997 situations. If True, return a `memoryview` object which may
998 avoid making a copy, but may be incompatible with code
999 expecting a `bytes` object.
1004 if size == 0 or offset >= self.size():
1005 return memoryview(b'') if return_memoryview else b''
1006 readsegs = streams.locators_and_ranges(self._segments, offset, size)
1009 prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1010 if prefetch_lookahead:
1011 # Doing prefetch on every read() call is surprisingly expensive
1012 # when we're trying to deliver data at 600+ MiBps and want
1013 # the read() fast path to be as lightweight as possible.
1015 # Only prefetching every 128 read operations
1016 # dramatically reduces the overhead while still
1017 # getting the benefit of prefetching (e.g. when
1018 # reading 128 KiB at a time, it checks for prefetch
1020 self._read_counter = (self._read_counter+1) % 128
1021 if self._read_counter == 1:
1022 prefetch = streams.locators_and_ranges(
1025 config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1026 limit=(1+prefetch_lookahead),
1032 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1034 blockview = memoryview(block)
1035 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1036 locs.add(lr.locator)
1042 if lr.locator not in locs:
1043 self.parent._my_block_manager().block_prefetch(lr.locator)
1044 locs.add(lr.locator)
1047 return data[0] if return_memoryview else data[0].tobytes()
1049 return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1054 def writeto(self, offset, data, num_retries):
1055 """Write `data` to the file starting at `offset`.
1057 This will update existing bytes and/or extend the size of the file as
1061 if not isinstance(data, bytes) and not isinstance(data, memoryview):
1062 data = data.encode()
1066 if offset > self.size():
1067 self.truncate(offset)
1069 if len(data) > config.KEEP_BLOCK_SIZE:
1070 # Chunk it up into smaller writes
1072 dataview = memoryview(data)
1073 while n < len(data):
1074 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1075 n += config.KEEP_BLOCK_SIZE
1078 self.set_committed(False)
1080 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1081 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1083 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1084 self._current_bblock.repack_writes()
1085 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1086 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1087 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1089 self._current_bblock.append(data)
1090 streams.replace_range(
1094 self._current_bblock.blockid,
1095 self._current_bblock.write_pointer - len(data),
1097 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1101 def flush(self, sync=True, num_retries=0):
1102 """Flush the current bufferblock to Keep.
1105 If True, commit block synchronously, wait until buffer block has been written.
1106 If False, commit block asynchronously, return immediately after putting block into
1109 if self.committed():
1112 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1113 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1114 self._current_bblock.repack_writes()
1115 if self._current_bblock.state() != _BufferBlock.DELETED:
1116 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1120 for s in self._segments:
1121 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1123 if bb.state() != _BufferBlock.COMMITTED:
1124 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1125 to_delete.add(s.locator)
1126 s.locator = bb.locator()
1128 # Don't delete the bufferblock if it's owned by many files. It'll be
1129 # deleted after all of its owners are flush()ed.
1130 if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1131 self.parent._my_block_manager().delete_bufferblock(s)
1133 self.parent.notify(MOD, self.parent, self.name, (self, self))
1137 def add_segment(self, blocks, pos, size):
1138 """Add a segment to the end of the file.
1140 `pos` and `offset` reference a section of the stream described by
1141 `blocks` (a list of Range objects)
1144 self._add_segment(blocks, pos, size)
1146 def _add_segment(self, blocks, pos, size):
1147 """Internal implementation of add_segment."""
1148 self.set_committed(False)
1149 for lr in streams.locators_and_ranges(blocks, pos, size):
1150 last = self._segments[-1] if self._segments else streams.Range(0, 0, 0, 0)
1151 r = streams.Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1152 self._segments.append(r)
1156 """Get the file size."""
1158 n = self._segments[-1]
1159 return n.range_start + n.range_size
1164 def manifest_text(self, stream_name=".", portable_locators=False,
1165 normalize=False, only_committed=False):
1168 for segment in self._segments:
1169 loc = segment.locator
1170 if self.parent._my_block_manager().is_bufferblock(loc):
1173 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1174 if portable_locators:
1175 loc = KeepLocator(loc).stripped()
1176 filestream.append(streams.LocatorAndRange(
1178 KeepLocator(loc).size,
1179 segment.segment_offset,
1182 buf += ' '.join(streams.normalize_stream(stream_name, {self.name: filestream}))
1188 def _reparent(self, newparent, newname):
1189 self.set_committed(False)
1190 self.flush(sync=True)
1191 self.parent.remove(self.name)
1192 self.parent = newparent
1194 self.lock = self.parent.root_collection().lock
1197 class ArvadosFileReader(ArvadosFileReaderBase):
1198 """Wraps ArvadosFile in a file-like object supporting reading only.
1200 Be aware that this class is NOT thread safe as there is no locking around
1201 updating file pointer.
1205 def __init__(self, arvadosfile, mode="r", num_retries=None):
1206 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1207 self.arvadosfile = arvadosfile
1210 return self.arvadosfile.size()
1212 def stream_name(self):
1213 return self.arvadosfile.parent.stream_name()
1215 def readinto(self, b):
1216 data = self.read(len(b))
1217 b[:len(data)] = data
1220 @_FileLikeObjectBase._before_close
1222 def read(self, size=-1, num_retries=None, return_memoryview=False):
1223 """Read up to `size` bytes from the file and return the result.
1225 Starts at the current file position. If `size` is negative or None,
1226 read the entire remainder of the file.
1228 Returns None if the file pointer is at the end of the file.
1230 Returns a `bytes` object, unless `return_memoryview` is True,
1231 in which case it returns a memory view, which may avoid an
1232 unnecessary data copy in some situations.
1235 if size < 0 or size is None:
1238 # specify exact=False, return_memoryview=True here so that we
1239 # only copy data once into the final buffer.
1241 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1244 self._filepos += len(rd)
1245 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1246 return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1248 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview)
1249 self._filepos += len(data)
1252 @_FileLikeObjectBase._before_close
1254 def readfrom(self, offset, size, num_retries=None, return_memoryview=False):
1255 """Read up to `size` bytes from the stream, starting at the specified file offset.
1257 This method does not change the file position.
1259 Returns a `bytes` object, unless `return_memoryview` is True,
1260 in which case it returns a memory view, which may avoid an
1261 unnecessary data copy in some situations.
1264 return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview)
1270 class ArvadosFileWriter(ArvadosFileReader):
1271 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1273 Be aware that this class is NOT thread safe as there is no locking around
1274 updating file pointer.
1278 def __init__(self, arvadosfile, mode, num_retries=None):
1279 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1280 self.arvadosfile.add_writer(self)
1285 @_FileLikeObjectBase._before_close
1287 def write(self, data, num_retries=None):
1288 if self.mode[0] == "a":
1289 self._filepos = self.size()
1290 self.arvadosfile.writeto(self._filepos, data, num_retries)
1291 self._filepos += len(data)
1294 @_FileLikeObjectBase._before_close
1296 def writelines(self, seq, num_retries=None):
1298 self.write(s, num_retries=num_retries)
1300 @_FileLikeObjectBase._before_close
1301 def truncate(self, size=None):
1303 size = self._filepos
1304 self.arvadosfile.truncate(size)
1306 @_FileLikeObjectBase._before_close
1308 self.arvadosfile.flush()
1310 def close(self, flush=True):
1312 self.arvadosfile.remove_writer(self, flush)
1313 super(ArvadosFileWriter, self).close()
1316 class WrappableFile(object):
1317 """An interface to an Arvados file that's compatible with io wrappers.
1320 def __init__(self, f):
1325 return self.f.close()
1327 return self.f.flush()
1328 def read(self, *args, **kwargs):
1329 return self.f.read(*args, **kwargs)
1331 return self.f.readable()
1332 def readinto(self, *args, **kwargs):
1333 return self.f.readinto(*args, **kwargs)
1334 def seek(self, *args, **kwargs):
1335 return self.f.seek(*args, **kwargs)
1337 return self.f.seekable()
1339 return self.f.tell()
1341 return self.f.writable()
1342 def write(self, *args, **kwargs):
1343 return self.f.write(*args, **kwargs)