1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 from future.utils import listitems, listvalues
9 standard_library.install_aliases()
10 from builtins import range
11 from builtins import object
28 from .errors import KeepWriteError, AssertionError, ArgumentError
29 from .keep import KeepLocator
30 from ._normalize_stream import normalize_stream
31 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
32 from .retry import retry_method
37 _logger = logging.getLogger('arvados.arvfile')
40 """split(path) -> streamname, filename
42 Separate the stream name and file name in a /-separated stream path and
43 return a tuple (stream_name, file_name). If no stream name is available,
48 stream_name, file_name = path.rsplit('/', 1)
49 except ValueError: # No / in string
50 stream_name, file_name = '.', path
51 return stream_name, file_name
54 class UnownedBlockError(Exception):
55 """Raised when there's an writable block without an owner on the BlockManager."""
59 class _FileLikeObjectBase(object):
60 def __init__(self, name, mode):
66 def _before_close(orig_func):
67 @functools.wraps(orig_func)
68 def before_close_wrapper(self, *args, **kwargs):
70 raise ValueError("I/O operation on closed stream file")
71 return orig_func(self, *args, **kwargs)
72 return before_close_wrapper
77 def __exit__(self, exc_type, exc_value, traceback):
88 class ArvadosFileReaderBase(_FileLikeObjectBase):
89 def __init__(self, name, mode, num_retries=None):
90 super(ArvadosFileReaderBase, self).__init__(name, mode)
92 self.num_retries = num_retries
93 self._readline_cache = (None, None)
97 data = self.readline()
102 def decompressed_name(self):
103 return re.sub('\.(bz2|gz)$', '', self.name)
105 @_FileLikeObjectBase._before_close
106 def seek(self, pos, whence=os.SEEK_SET):
107 if whence == os.SEEK_CUR:
109 elif whence == os.SEEK_END:
112 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
128 @_FileLikeObjectBase._before_close
130 def readall(self, size=2**20, num_retries=None):
132 data = self.read(size, num_retries=num_retries)
137 @_FileLikeObjectBase._before_close
139 def readline(self, size=float('inf'), num_retries=None):
140 cache_pos, cache_data = self._readline_cache
141 if self.tell() == cache_pos:
143 self._filepos += len(cache_data)
146 data_size = len(data[-1])
147 while (data_size < size) and (b'\n' not in data[-1]):
148 next_read = self.read(2 ** 20, num_retries=num_retries)
151 data.append(next_read)
152 data_size += len(next_read)
153 data = b''.join(data)
155 nextline_index = data.index(b'\n') + 1
157 nextline_index = len(data)
158 nextline_index = min(nextline_index, size)
159 self._filepos -= len(data) - nextline_index
160 self._readline_cache = (self.tell(), data[nextline_index:])
161 return data[:nextline_index].decode()
163 @_FileLikeObjectBase._before_close
165 def decompress(self, decompress, size, num_retries=None):
166 for segment in self.readall(size, num_retries=num_retries):
167 data = decompress(segment)
171 @_FileLikeObjectBase._before_close
173 def readall_decompressed(self, size=2**20, num_retries=None):
175 if self.name.endswith('.bz2'):
176 dc = bz2.BZ2Decompressor()
177 return self.decompress(dc.decompress, size,
178 num_retries=num_retries)
179 elif self.name.endswith('.gz'):
180 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
181 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
182 size, num_retries=num_retries)
184 return self.readall(size, num_retries=num_retries)
186 @_FileLikeObjectBase._before_close
188 def readlines(self, sizehint=float('inf'), num_retries=None):
191 for s in self.readall(num_retries=num_retries):
194 if data_size >= sizehint:
196 return b''.join(data).decode().splitlines(True)
199 raise IOError(errno.ENOSYS, "Not implemented")
201 def read(self, size, num_retries=None):
202 raise IOError(errno.ENOSYS, "Not implemented")
204 def readfrom(self, start, size, num_retries=None):
205 raise IOError(errno.ENOSYS, "Not implemented")
208 class StreamFileReader(ArvadosFileReaderBase):
209 class _NameAttribute(str):
210 # The Python file API provides a plain .name attribute.
211 # Older SDK provided a name() method.
212 # This class provides both, for maximum compatibility.
216 def __init__(self, stream, segments, name):
217 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
218 self._stream = stream
219 self.segments = segments
221 def stream_name(self):
222 return self._stream.name()
225 n = self.segments[-1]
226 return n.range_start + n.range_size
228 @_FileLikeObjectBase._before_close
230 def read(self, size, num_retries=None):
231 """Read up to 'size' bytes from the stream, starting at the current file position"""
236 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
238 lr = available_chunks[0]
239 data = self._stream.readfrom(lr.locator+lr.segment_offset,
241 num_retries=num_retries)
243 self._filepos += len(data)
246 @_FileLikeObjectBase._before_close
248 def readfrom(self, start, size, num_retries=None):
249 """Read up to 'size' bytes from the stream, starting at 'start'"""
254 for lr in locators_and_ranges(self.segments, start, size):
255 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
256 num_retries=num_retries))
257 return b''.join(data)
259 def as_manifest(self):
261 for r in self.segments:
262 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
263 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
266 def synchronized(orig_func):
267 @functools.wraps(orig_func)
268 def synchronized_wrapper(self, *args, **kwargs):
270 return orig_func(self, *args, **kwargs)
271 return synchronized_wrapper
274 class StateChangeError(Exception):
275 def __init__(self, message, state, nextstate):
276 super(StateChangeError, self).__init__(message)
278 self.nextstate = nextstate
280 class _BufferBlock(object):
281 """A stand-in for a Keep block that is in the process of being written.
283 Writers can append to it, get the size, and compute the Keep locator.
284 There are three valid states:
290 Block is in the process of being uploaded to Keep, append is an error.
293 The block has been written to Keep, its internal buffer has been
294 released, fetching the block will fetch it via keep client (since we
295 discarded the internal copy), and identifiers referring to the BufferBlock
296 can be replaced with the block locator.
306 def __init__(self, blockid, starting_capacity, owner):
309 the identifier for this block
312 the initial buffer capacity
315 ArvadosFile that owns this block
318 self.blockid = blockid
319 self.buffer_block = bytearray(starting_capacity)
320 self.buffer_view = memoryview(self.buffer_block)
321 self.write_pointer = 0
322 self._state = _BufferBlock.WRITABLE
325 self.lock = threading.Lock()
326 self.wait_for_commit = threading.Event()
330 def append(self, data):
331 """Append some data to the buffer.
333 Only valid if the block is in WRITABLE state. Implements an expanding
334 buffer, doubling capacity as needed to accomdate all the data.
337 if self._state == _BufferBlock.WRITABLE:
338 if not isinstance(data, bytes) and not isinstance(data, memoryview):
340 while (self.write_pointer+len(data)) > len(self.buffer_block):
341 new_buffer_block = bytearray(len(self.buffer_block) * 2)
342 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
343 self.buffer_block = new_buffer_block
344 self.buffer_view = memoryview(self.buffer_block)
345 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
346 self.write_pointer += len(data)
349 raise AssertionError("Buffer block is not writable")
351 STATE_TRANSITIONS = frozenset([
353 (PENDING, COMMITTED),
358 def set_state(self, nextstate, val=None):
359 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
360 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
361 self._state = nextstate
363 if self._state == _BufferBlock.PENDING:
364 self.wait_for_commit.clear()
366 if self._state == _BufferBlock.COMMITTED:
368 self.buffer_view = None
369 self.buffer_block = None
370 self.wait_for_commit.set()
372 if self._state == _BufferBlock.ERROR:
374 self.wait_for_commit.set()
381 """The amount of data written to the buffer."""
382 return self.write_pointer
386 """The Keep locator for this buffer's contents."""
387 if self._locator is None:
388 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
392 def clone(self, new_blockid, owner):
393 if self._state == _BufferBlock.COMMITTED:
394 raise AssertionError("Cannot duplicate committed buffer block")
395 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
396 bufferblock.append(self.buffer_view[0:self.size()])
401 self._state = _BufferBlock.DELETED
403 self.buffer_block = None
404 self.buffer_view = None
407 def repack_writes(self):
408 """Optimize buffer block by repacking segments in file sequence.
410 When the client makes random writes, they appear in the buffer block in
411 the sequence they were written rather than the sequence they appear in
412 the file. This makes for inefficient, fragmented manifests. Attempt
413 to optimize by repacking writes in file sequence.
416 if self._state != _BufferBlock.WRITABLE:
417 raise AssertionError("Cannot repack non-writable block")
419 segs = self.owner.segments()
421 # Collect the segments that reference the buffer block.
422 bufferblock_segs = [s for s in segs if s.locator == self.blockid]
424 # Collect total data referenced by segments (could be smaller than
425 # bufferblock size if a portion of the file was written and
427 write_total = sum([s.range_size for s in bufferblock_segs])
429 if write_total < self.size() or len(bufferblock_segs) > 1:
430 # If there's more than one segment referencing this block, it is
431 # due to out-of-order writes and will produce a fragmented
432 # manifest, so try to optimize by re-packing into a new buffer.
433 contents = self.buffer_view[0:self.write_pointer].tobytes()
434 new_bb = _BufferBlock(None, write_total, None)
435 for t in bufferblock_segs:
436 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
437 t.segment_offset = new_bb.size() - t.range_size
439 self.buffer_block = new_bb.buffer_block
440 self.buffer_view = new_bb.buffer_view
441 self.write_pointer = new_bb.write_pointer
444 self.owner.set_segments(segs)
447 return "<BufferBlock %s>" % (self.blockid)
450 class NoopLock(object):
454 def __exit__(self, exc_type, exc_value, traceback):
457 def acquire(self, blocking=False):
464 def must_be_writable(orig_func):
465 @functools.wraps(orig_func)
466 def must_be_writable_wrapper(self, *args, **kwargs):
467 if not self.writable():
468 raise IOError(errno.EROFS, "Collection is read-only.")
469 return orig_func(self, *args, **kwargs)
470 return must_be_writable_wrapper
473 class _BlockManager(object):
474 """BlockManager handles buffer blocks.
476 Also handles background block uploads, and background block prefetch for a
477 Collection of ArvadosFiles.
481 DEFAULT_PUT_THREADS = 2
482 DEFAULT_GET_THREADS = 2
484 def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes=[]):
485 """keep: KeepClient object to use"""
487 self._bufferblocks = collections.OrderedDict()
488 self._put_queue = None
489 self._put_threads = None
490 self._prefetch_queue = None
491 self._prefetch_threads = None
492 self.lock = threading.Lock()
493 self.prefetch_enabled = True
495 self.num_put_threads = put_threads
497 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
498 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
500 self.storage_classes = storage_classes
501 self._pending_write_size = 0
502 self.threads_lock = threading.Lock()
503 self.padding_block = None
504 self.num_retries = num_retries
507 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
508 """Allocate a new, empty bufferblock in WRITABLE state and return it.
511 optional block identifier, otherwise one will be automatically assigned
514 optional capacity, otherwise will use default capacity
517 ArvadosFile that owns this block
520 return self._alloc_bufferblock(blockid, starting_capacity, owner)
522 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
524 blockid = str(uuid.uuid4())
525 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
526 self._bufferblocks[bufferblock.blockid] = bufferblock
530 def dup_block(self, block, owner):
531 """Create a new bufferblock initialized with the content of an existing bufferblock.
534 the buffer block to copy.
537 ArvadosFile that owns the new block
540 new_blockid = str(uuid.uuid4())
541 bufferblock = block.clone(new_blockid, owner)
542 self._bufferblocks[bufferblock.blockid] = bufferblock
546 def is_bufferblock(self, locator):
547 return locator in self._bufferblocks
549 def _commit_bufferblock_worker(self):
550 """Background uploader thread."""
554 bufferblock = self._put_queue.get()
555 if bufferblock is None:
558 if self.copies is None:
559 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes)
561 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes)
562 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
563 except Exception as e:
564 bufferblock.set_state(_BufferBlock.ERROR, e)
566 if self._put_queue is not None:
567 self._put_queue.task_done()
569 def start_put_threads(self):
570 with self.threads_lock:
571 if self._put_threads is None:
572 # Start uploader threads.
574 # If we don't limit the Queue size, the upload queue can quickly
575 # grow to take up gigabytes of RAM if the writing process is
576 # generating data more quickly than it can be send to the Keep
579 # With two upload threads and a queue size of 2, this means up to 4
580 # blocks pending. If they are full 64 MiB blocks, that means up to
581 # 256 MiB of internal buffering, which is the same size as the
582 # default download block cache in KeepClient.
583 self._put_queue = queue.Queue(maxsize=2)
585 self._put_threads = []
586 for i in range(0, self.num_put_threads):
587 thread = threading.Thread(target=self._commit_bufferblock_worker)
588 self._put_threads.append(thread)
592 def _block_prefetch_worker(self):
593 """The background downloader thread."""
596 b = self._prefetch_queue.get()
601 _logger.exception("Exception doing block prefetch")
604 def start_get_threads(self):
605 if self._prefetch_threads is None:
606 self._prefetch_queue = queue.Queue()
607 self._prefetch_threads = []
608 for i in range(0, self.num_get_threads):
609 thread = threading.Thread(target=self._block_prefetch_worker)
610 self._prefetch_threads.append(thread)
616 def stop_threads(self):
617 """Shut down and wait for background upload and download threads to finish."""
619 if self._put_threads is not None:
620 for t in self._put_threads:
621 self._put_queue.put(None)
622 for t in self._put_threads:
624 self._put_threads = None
625 self._put_queue = None
627 if self._prefetch_threads is not None:
628 for t in self._prefetch_threads:
629 self._prefetch_queue.put(None)
630 for t in self._prefetch_threads:
632 self._prefetch_threads = None
633 self._prefetch_queue = None
638 def __exit__(self, exc_type, exc_value, traceback):
642 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
643 """Packs small blocks together before uploading"""
645 self._pending_write_size += closed_file_size
647 # Check if there are enough small blocks for filling up one in full
648 if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
651 # Search blocks ready for getting packed together before being
653 # A WRITABLE block always has an owner.
654 # A WRITABLE block with its owner.closed() implies that its
655 # size is <= KEEP_BLOCK_SIZE/2.
657 small_blocks = [b for b in listvalues(self._bufferblocks)
658 if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
659 except AttributeError:
660 # Writable blocks without owner shouldn't exist.
661 raise UnownedBlockError()
663 if len(small_blocks) <= 1:
664 # Not enough small blocks for repacking
667 for bb in small_blocks:
670 # Update the pending write size count with its true value, just in case
671 # some small file was opened, written and closed several times.
672 self._pending_write_size = sum([b.size() for b in small_blocks])
674 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
677 new_bb = self._alloc_bufferblock()
680 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
681 bb = small_blocks.pop(0)
682 new_bb.owner.append(bb.owner)
683 self._pending_write_size -= bb.size()
684 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
685 files.append((bb, new_bb.write_pointer - bb.size()))
687 self.commit_bufferblock(new_bb, sync=sync)
689 for bb, new_bb_segment_offset in files:
690 newsegs = bb.owner.segments()
692 if s.locator == bb.blockid:
693 s.locator = new_bb.blockid
694 s.segment_offset = new_bb_segment_offset+s.segment_offset
695 bb.owner.set_segments(newsegs)
696 self._delete_bufferblock(bb.blockid)
698 def commit_bufferblock(self, block, sync):
699 """Initiate a background upload of a bufferblock.
702 The block object to upload
705 If `sync` is True, upload the block synchronously.
706 If `sync` is False, upload the block asynchronously. This will
707 return immediately unless the upload queue is at capacity, in
708 which case it will wait on an upload queue slot.
712 # Mark the block as PENDING so to disallow any more appends.
713 block.set_state(_BufferBlock.PENDING)
714 except StateChangeError as e:
715 if e.state == _BufferBlock.PENDING:
717 block.wait_for_commit.wait()
720 if block.state() == _BufferBlock.COMMITTED:
722 elif block.state() == _BufferBlock.ERROR:
729 if self.copies is None:
730 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes)
732 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes)
733 block.set_state(_BufferBlock.COMMITTED, loc)
734 except Exception as e:
735 block.set_state(_BufferBlock.ERROR, e)
738 self.start_put_threads()
739 self._put_queue.put(block)
742 def get_bufferblock(self, locator):
743 return self._bufferblocks.get(locator)
746 def get_padding_block(self):
747 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
748 when using truncate() to extend the size of a file.
750 For reference (and possible future optimization), the md5sum of the
751 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
755 if self.padding_block is None:
756 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
757 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
758 self.commit_bufferblock(self.padding_block, False)
759 return self.padding_block
762 def delete_bufferblock(self, locator):
763 self._delete_bufferblock(locator)
765 def _delete_bufferblock(self, locator):
766 bb = self._bufferblocks[locator]
768 del self._bufferblocks[locator]
770 def get_block_contents(self, locator, num_retries, cache_only=False):
773 First checks to see if the locator is a BufferBlock and return that, if
774 not, passes the request through to KeepClient.get().
778 if locator in self._bufferblocks:
779 bufferblock = self._bufferblocks[locator]
780 if bufferblock.state() != _BufferBlock.COMMITTED:
781 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
783 locator = bufferblock._locator
785 return self._keep.get_from_cache(locator)
787 return self._keep.get(locator, num_retries=num_retries)
789 def commit_all(self):
790 """Commit all outstanding buffer blocks.
792 This is a synchronous call, and will not return until all buffer blocks
793 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
796 self.repack_small_blocks(force=True, sync=True)
799 items = listitems(self._bufferblocks)
802 if v.state() != _BufferBlock.COMMITTED and v.owner:
803 # Ignore blocks with a list of owners, as if they're not in COMMITTED
804 # state, they're already being committed asynchronously.
805 if isinstance(v.owner, ArvadosFile):
806 v.owner.flush(sync=False)
809 if self._put_queue is not None:
810 self._put_queue.join()
814 if v.state() == _BufferBlock.ERROR:
815 err.append((v.locator(), v.error))
817 raise KeepWriteError("Error writing some blocks", err, label="block")
820 # flush again with sync=True to remove committed bufferblocks from
823 if isinstance(v.owner, ArvadosFile):
824 v.owner.flush(sync=True)
825 elif isinstance(v.owner, list) and len(v.owner) > 0:
826 # This bufferblock is referenced by many files as a result
827 # of repacking small blocks, so don't delete it when flushing
828 # its owners, just do it after flushing them all.
829 for owner in v.owner:
830 owner.flush(sync=True)
831 self.delete_bufferblock(k)
833 def block_prefetch(self, locator):
834 """Initiate a background download of a block.
836 This assumes that the underlying KeepClient implements a block cache,
837 so repeated requests for the same block will not result in repeated
838 downloads (unless the block is evicted from the cache.) This method
843 if not self.prefetch_enabled:
846 if self._keep.get_from_cache(locator) is not None:
850 if locator in self._bufferblocks:
853 self.start_get_threads()
854 self._prefetch_queue.put(locator)
857 class ArvadosFile(object):
858 """Represent a file in a Collection.
860 ArvadosFile manages the underlying representation of a file in Keep as a
861 sequence of segments spanning a set of blocks, and implements random
864 This object may be accessed from multiple threads.
868 __slots__ = ('parent', 'name', '_writers', '_committed',
869 '_segments', 'lock', '_current_bblock', 'fuse_entry')
871 def __init__(self, parent, name, stream=[], segments=[]):
873 ArvadosFile constructor.
876 a list of Range objects representing a block stream
879 a list of Range objects representing segments
883 self._writers = set()
884 self._committed = False
886 self.lock = parent.root_collection().lock
888 self._add_segment(stream, s.locator, s.range_size)
889 self._current_bblock = None
892 return self.parent.writable()
895 def permission_expired(self, as_of_dt=None):
896 """Returns True if any of the segment's locators is expired"""
897 for r in self._segments:
898 if KeepLocator(r.locator).permission_expired(as_of_dt):
903 def has_remote_blocks(self):
904 """Returns True if any of the segment's locators has a +R signature"""
906 for s in self._segments:
907 if '+R' in s.locator:
912 def _copy_remote_blocks(self, remote_blocks={}):
913 """Ask Keep to copy remote blocks and point to their local copies.
915 This is called from the parent Collection.
918 Shared cache of remote to local block mappings. This is used to avoid
919 doing extra work when blocks are shared by more than one file in
920 different subdirectories.
923 for s in self._segments:
924 if '+R' in s.locator:
926 loc = remote_blocks[s.locator]
928 loc = self.parent._my_keep().refresh_signature(s.locator)
929 remote_blocks[s.locator] = loc
931 self.parent.set_committed(False)
936 return copy.copy(self._segments)
939 def clone(self, new_parent, new_name):
940 """Make a copy of this file."""
941 cp = ArvadosFile(new_parent, new_name)
942 cp.replace_contents(self)
947 def replace_contents(self, other):
948 """Replace segments of this file with segments from another `ArvadosFile` object."""
952 for other_segment in other.segments():
953 new_loc = other_segment.locator
954 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
955 if other_segment.locator not in map_loc:
956 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
957 if bufferblock.state() != _BufferBlock.WRITABLE:
958 map_loc[other_segment.locator] = bufferblock.locator()
960 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
961 new_loc = map_loc[other_segment.locator]
963 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
965 self.set_committed(False)
967 def __eq__(self, other):
970 if not isinstance(other, ArvadosFile):
973 othersegs = other.segments()
975 if len(self._segments) != len(othersegs):
977 for i in range(0, len(othersegs)):
978 seg1 = self._segments[i]
983 if self.parent._my_block_manager().is_bufferblock(loc1):
984 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
986 if other.parent._my_block_manager().is_bufferblock(loc2):
987 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
989 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
990 seg1.range_start != seg2.range_start or
991 seg1.range_size != seg2.range_size or
992 seg1.segment_offset != seg2.segment_offset):
997 def __ne__(self, other):
998 return not self.__eq__(other)
1001 def set_segments(self, segs):
1002 self._segments = segs
1005 def set_committed(self, value=True):
1006 """Set committed flag.
1008 If value is True, set committed to be True.
1010 If value is False, set committed to be False for this and all parents.
1012 if value == self._committed:
1014 self._committed = value
1015 if self._committed is False and self.parent is not None:
1016 self.parent.set_committed(False)
1019 def committed(self):
1020 """Get whether this is committed or not."""
1021 return self._committed
1024 def add_writer(self, writer):
1025 """Add an ArvadosFileWriter reference to the list of writers"""
1026 if isinstance(writer, ArvadosFileWriter):
1027 self._writers.add(writer)
1030 def remove_writer(self, writer, flush):
1032 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
1033 and do some block maintenance tasks.
1035 self._writers.remove(writer)
1037 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
1038 # File writer closed, not small enough for repacking
1041 # All writers closed and size is adequate for repacking
1042 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
1046 Get whether this is closed or not. When the writers list is empty, the file
1047 is supposed to be closed.
1049 return len(self._writers) == 0
1053 def truncate(self, size):
1054 """Shrink or expand the size of the file.
1056 If `size` is less than the size of the file, the file contents after
1057 `size` will be discarded. If `size` is greater than the current size
1058 of the file, it will be filled with zero bytes.
1061 if size < self.size():
1063 for r in self._segments:
1064 range_end = r.range_start+r.range_size
1065 if r.range_start >= size:
1066 # segment is past the trucate size, all done
1068 elif size < range_end:
1069 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1070 nr.segment_offset = r.segment_offset
1076 self._segments = new_segs
1077 self.set_committed(False)
1078 elif size > self.size():
1079 padding = self.parent._my_block_manager().get_padding_block()
1080 diff = size - self.size()
1081 while diff > config.KEEP_BLOCK_SIZE:
1082 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1083 diff -= config.KEEP_BLOCK_SIZE
1085 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1086 self.set_committed(False)
1088 # size == self.size()
1091 def readfrom(self, offset, size, num_retries, exact=False):
1092 """Read up to `size` bytes from the file starting at `offset`.
1095 If False (default), return less data than requested if the read
1096 crosses a block boundary and the next block isn't cached. If True,
1097 only return less data than requested when hitting EOF.
1101 if size == 0 or offset >= self.size():
1103 readsegs = locators_and_ranges(self._segments, offset, size)
1104 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
1109 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1111 blockview = memoryview(block)
1112 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1113 locs.add(lr.locator)
1118 if lr.locator not in locs:
1119 self.parent._my_block_manager().block_prefetch(lr.locator)
1120 locs.add(lr.locator)
1122 return b''.join(data)
1126 def writeto(self, offset, data, num_retries):
1127 """Write `data` to the file starting at `offset`.
1129 This will update existing bytes and/or extend the size of the file as
1133 if not isinstance(data, bytes) and not isinstance(data, memoryview):
1134 data = data.encode()
1138 if offset > self.size():
1139 self.truncate(offset)
1141 if len(data) > config.KEEP_BLOCK_SIZE:
1142 # Chunk it up into smaller writes
1144 dataview = memoryview(data)
1145 while n < len(data):
1146 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1147 n += config.KEEP_BLOCK_SIZE
1150 self.set_committed(False)
1152 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1153 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1155 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1156 self._current_bblock.repack_writes()
1157 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1158 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1159 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1161 self._current_bblock.append(data)
1163 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1165 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1170 def flush(self, sync=True, num_retries=0):
1171 """Flush the current bufferblock to Keep.
1174 If True, commit block synchronously, wait until buffer block has been written.
1175 If False, commit block asynchronously, return immediately after putting block into
1178 if self.committed():
1181 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1182 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1183 self._current_bblock.repack_writes()
1184 if self._current_bblock.state() != _BufferBlock.DELETED:
1185 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1189 for s in self._segments:
1190 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1192 if bb.state() != _BufferBlock.COMMITTED:
1193 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1194 to_delete.add(s.locator)
1195 s.locator = bb.locator()
1197 # Don't delete the bufferblock if it's owned by many files. It'll be
1198 # deleted after all of its owners are flush()ed.
1199 if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1200 self.parent._my_block_manager().delete_bufferblock(s)
1202 self.parent.notify(MOD, self.parent, self.name, (self, self))
1206 def add_segment(self, blocks, pos, size):
1207 """Add a segment to the end of the file.
1209 `pos` and `offset` reference a section of the stream described by
1210 `blocks` (a list of Range objects)
1213 self._add_segment(blocks, pos, size)
1215 def _add_segment(self, blocks, pos, size):
1216 """Internal implementation of add_segment."""
1217 self.set_committed(False)
1218 for lr in locators_and_ranges(blocks, pos, size):
1219 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1220 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1221 self._segments.append(r)
1225 """Get the file size."""
1227 n = self._segments[-1]
1228 return n.range_start + n.range_size
1233 def manifest_text(self, stream_name=".", portable_locators=False,
1234 normalize=False, only_committed=False):
1237 for segment in self._segments:
1238 loc = segment.locator
1239 if self.parent._my_block_manager().is_bufferblock(loc):
1242 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1243 if portable_locators:
1244 loc = KeepLocator(loc).stripped()
1245 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1246 segment.segment_offset, segment.range_size))
1247 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1253 def _reparent(self, newparent, newname):
1254 self.set_committed(False)
1255 self.flush(sync=True)
1256 self.parent.remove(self.name)
1257 self.parent = newparent
1259 self.lock = self.parent.root_collection().lock
1262 class ArvadosFileReader(ArvadosFileReaderBase):
1263 """Wraps ArvadosFile in a file-like object supporting reading only.
1265 Be aware that this class is NOT thread safe as there is no locking around
1266 updating file pointer.
1270 def __init__(self, arvadosfile, mode="r", num_retries=None):
1271 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1272 self.arvadosfile = arvadosfile
1275 return self.arvadosfile.size()
1277 def stream_name(self):
1278 return self.arvadosfile.parent.stream_name()
1280 def readinto(self, b):
1281 data = self.read(len(b))
1282 b[:len(data)] = data
1285 @_FileLikeObjectBase._before_close
1287 def read(self, size=None, num_retries=None):
1288 """Read up to `size` bytes from the file and return the result.
1290 Starts at the current file position. If `size` is None, read the
1291 entire remainder of the file.
1295 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1298 self._filepos += len(rd)
1299 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1300 return b''.join(data)
1302 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1303 self._filepos += len(data)
1306 @_FileLikeObjectBase._before_close
1308 def readfrom(self, offset, size, num_retries=None):
1309 """Read up to `size` bytes from the stream, starting at the specified file offset.
1311 This method does not change the file position.
1313 return self.arvadosfile.readfrom(offset, size, num_retries)
1319 class ArvadosFileWriter(ArvadosFileReader):
1320 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1322 Be aware that this class is NOT thread safe as there is no locking around
1323 updating file pointer.
1327 def __init__(self, arvadosfile, mode, num_retries=None):
1328 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1329 self.arvadosfile.add_writer(self)
1334 @_FileLikeObjectBase._before_close
1336 def write(self, data, num_retries=None):
1337 if self.mode[0] == "a":
1338 self._filepos = self.size()
1339 self.arvadosfile.writeto(self._filepos, data, num_retries)
1340 self._filepos += len(data)
1343 @_FileLikeObjectBase._before_close
1345 def writelines(self, seq, num_retries=None):
1347 self.write(s, num_retries=num_retries)
1349 @_FileLikeObjectBase._before_close
1350 def truncate(self, size=None):
1352 size = self._filepos
1353 self.arvadosfile.truncate(size)
1355 @_FileLikeObjectBase._before_close
1357 self.arvadosfile.flush()
1359 def close(self, flush=True):
1361 self.arvadosfile.remove_writer(self, flush)
1362 super(ArvadosFileWriter, self).close()
1365 class WrappableFile(object):
1366 """An interface to an Arvados file that's compatible with io wrappers.
1369 def __init__(self, f):
1374 return self.f.close()
1376 return self.f.flush()
1377 def read(self, *args, **kwargs):
1378 return self.f.read(*args, **kwargs)
1380 return self.f.readable()
1381 def readinto(self, *args, **kwargs):
1382 return self.f.readinto(*args, **kwargs)
1383 def seek(self, *args, **kwargs):
1384 return self.f.seek(*args, **kwargs)
1386 return self.f.seekable()
1388 return self.f.tell()
1390 return self.f.writable()
1391 def write(self, *args, **kwargs):
1392 return self.f.write(*args, **kwargs)