1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 from future.utils import listitems, listvalues
9 standard_library.install_aliases()
10 from builtins import range
11 from builtins import object
28 from .errors import KeepWriteError, AssertionError, ArgumentError
29 from .keep import KeepLocator
30 from ._normalize_stream import normalize_stream
31 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
32 from .retry import retry_method
37 _logger = logging.getLogger('arvados.arvfile')
40 """split(path) -> streamname, filename
42 Separate the stream name and file name in a /-separated stream path and
43 return a tuple (stream_name, file_name). If no stream name is available,
48 stream_name, file_name = path.rsplit('/', 1)
49 except ValueError: # No / in string
50 stream_name, file_name = '.', path
51 return stream_name, file_name
54 class UnownedBlockError(Exception):
55 """Raised when there's an writable block without an owner on the BlockManager."""
59 class _FileLikeObjectBase(object):
60 def __init__(self, name, mode):
66 def _before_close(orig_func):
67 @functools.wraps(orig_func)
68 def before_close_wrapper(self, *args, **kwargs):
70 raise ValueError("I/O operation on closed stream file")
71 return orig_func(self, *args, **kwargs)
72 return before_close_wrapper
77 def __exit__(self, exc_type, exc_value, traceback):
88 class ArvadosFileReaderBase(_FileLikeObjectBase):
89 def __init__(self, name, mode, num_retries=None):
90 super(ArvadosFileReaderBase, self).__init__(name, mode)
91 self._binary = 'b' in mode
92 if sys.version_info >= (3, 0) and not self._binary:
93 raise NotImplementedError("text mode {!r} is not implemented".format(mode))
95 self.num_retries = num_retries
96 self._readline_cache = (None, None)
100 data = self.readline()
105 def decompressed_name(self):
106 return re.sub('\.(bz2|gz)$', '', self.name)
108 @_FileLikeObjectBase._before_close
109 def seek(self, pos, whence=os.SEEK_SET):
110 if whence == os.SEEK_CUR:
112 elif whence == os.SEEK_END:
115 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
131 @_FileLikeObjectBase._before_close
133 def readall(self, size=2**20, num_retries=None):
135 data = self.read(size, num_retries=num_retries)
140 @_FileLikeObjectBase._before_close
142 def readline(self, size=float('inf'), num_retries=None):
143 cache_pos, cache_data = self._readline_cache
144 if self.tell() == cache_pos:
146 self._filepos += len(cache_data)
149 data_size = len(data[-1])
150 while (data_size < size) and (b'\n' not in data[-1]):
151 next_read = self.read(2 ** 20, num_retries=num_retries)
154 data.append(next_read)
155 data_size += len(next_read)
156 data = b''.join(data)
158 nextline_index = data.index(b'\n') + 1
160 nextline_index = len(data)
161 nextline_index = min(nextline_index, size)
162 self._filepos -= len(data) - nextline_index
163 self._readline_cache = (self.tell(), data[nextline_index:])
164 return data[:nextline_index].decode()
166 @_FileLikeObjectBase._before_close
168 def decompress(self, decompress, size, num_retries=None):
169 for segment in self.readall(size, num_retries=num_retries):
170 data = decompress(segment)
174 @_FileLikeObjectBase._before_close
176 def readall_decompressed(self, size=2**20, num_retries=None):
178 if self.name.endswith('.bz2'):
179 dc = bz2.BZ2Decompressor()
180 return self.decompress(dc.decompress, size,
181 num_retries=num_retries)
182 elif self.name.endswith('.gz'):
183 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
184 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
185 size, num_retries=num_retries)
187 return self.readall(size, num_retries=num_retries)
189 @_FileLikeObjectBase._before_close
191 def readlines(self, sizehint=float('inf'), num_retries=None):
194 for s in self.readall(num_retries=num_retries):
197 if data_size >= sizehint:
199 return b''.join(data).decode().splitlines(True)
202 raise IOError(errno.ENOSYS, "Not implemented")
204 def read(self, size, num_retries=None):
205 raise IOError(errno.ENOSYS, "Not implemented")
207 def readfrom(self, start, size, num_retries=None):
208 raise IOError(errno.ENOSYS, "Not implemented")
211 class StreamFileReader(ArvadosFileReaderBase):
212 class _NameAttribute(str):
213 # The Python file API provides a plain .name attribute.
214 # Older SDK provided a name() method.
215 # This class provides both, for maximum compatibility.
219 def __init__(self, stream, segments, name):
220 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
221 self._stream = stream
222 self.segments = segments
224 def stream_name(self):
225 return self._stream.name()
228 n = self.segments[-1]
229 return n.range_start + n.range_size
231 @_FileLikeObjectBase._before_close
233 def read(self, size, num_retries=None):
234 """Read up to 'size' bytes from the stream, starting at the current file position"""
239 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
241 lr = available_chunks[0]
242 data = self._stream.readfrom(lr.locator+lr.segment_offset,
244 num_retries=num_retries)
246 self._filepos += len(data)
249 @_FileLikeObjectBase._before_close
251 def readfrom(self, start, size, num_retries=None):
252 """Read up to 'size' bytes from the stream, starting at 'start'"""
257 for lr in locators_and_ranges(self.segments, start, size):
258 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
259 num_retries=num_retries))
260 return b''.join(data)
262 def as_manifest(self):
264 for r in self.segments:
265 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
266 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
269 def synchronized(orig_func):
270 @functools.wraps(orig_func)
271 def synchronized_wrapper(self, *args, **kwargs):
273 return orig_func(self, *args, **kwargs)
274 return synchronized_wrapper
277 class StateChangeError(Exception):
278 def __init__(self, message, state, nextstate):
279 super(StateChangeError, self).__init__(message)
281 self.nextstate = nextstate
283 class _BufferBlock(object):
284 """A stand-in for a Keep block that is in the process of being written.
286 Writers can append to it, get the size, and compute the Keep locator.
287 There are three valid states:
293 Block is in the process of being uploaded to Keep, append is an error.
296 The block has been written to Keep, its internal buffer has been
297 released, fetching the block will fetch it via keep client (since we
298 discarded the internal copy), and identifiers referring to the BufferBlock
299 can be replaced with the block locator.
309 def __init__(self, blockid, starting_capacity, owner):
312 the identifier for this block
315 the initial buffer capacity
318 ArvadosFile that owns this block
321 self.blockid = blockid
322 self.buffer_block = bytearray(starting_capacity)
323 self.buffer_view = memoryview(self.buffer_block)
324 self.write_pointer = 0
325 self._state = _BufferBlock.WRITABLE
328 self.lock = threading.Lock()
329 self.wait_for_commit = threading.Event()
333 def append(self, data):
334 """Append some data to the buffer.
336 Only valid if the block is in WRITABLE state. Implements an expanding
337 buffer, doubling capacity as needed to accomdate all the data.
340 if self._state == _BufferBlock.WRITABLE:
341 if not isinstance(data, bytes) and not isinstance(data, memoryview):
343 while (self.write_pointer+len(data)) > len(self.buffer_block):
344 new_buffer_block = bytearray(len(self.buffer_block) * 2)
345 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
346 self.buffer_block = new_buffer_block
347 self.buffer_view = memoryview(self.buffer_block)
348 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
349 self.write_pointer += len(data)
352 raise AssertionError("Buffer block is not writable")
354 STATE_TRANSITIONS = frozenset([
356 (PENDING, COMMITTED),
361 def set_state(self, nextstate, val=None):
362 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
363 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
364 self._state = nextstate
366 if self._state == _BufferBlock.PENDING:
367 self.wait_for_commit.clear()
369 if self._state == _BufferBlock.COMMITTED:
371 self.buffer_view = None
372 self.buffer_block = None
373 self.wait_for_commit.set()
375 if self._state == _BufferBlock.ERROR:
377 self.wait_for_commit.set()
384 """The amount of data written to the buffer."""
385 return self.write_pointer
389 """The Keep locator for this buffer's contents."""
390 if self._locator is None:
391 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
395 def clone(self, new_blockid, owner):
396 if self._state == _BufferBlock.COMMITTED:
397 raise AssertionError("Cannot duplicate committed buffer block")
398 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
399 bufferblock.append(self.buffer_view[0:self.size()])
404 self._state = _BufferBlock.DELETED
406 self.buffer_block = None
407 self.buffer_view = None
410 def repack_writes(self):
411 """Optimize buffer block by repacking segments in file sequence.
413 When the client makes random writes, they appear in the buffer block in
414 the sequence they were written rather than the sequence they appear in
415 the file. This makes for inefficient, fragmented manifests. Attempt
416 to optimize by repacking writes in file sequence.
419 if self._state != _BufferBlock.WRITABLE:
420 raise AssertionError("Cannot repack non-writable block")
422 segs = self.owner.segments()
424 # Collect the segments that reference the buffer block.
425 bufferblock_segs = [s for s in segs if s.locator == self.blockid]
427 # Collect total data referenced by segments (could be smaller than
428 # bufferblock size if a portion of the file was written and
430 write_total = sum([s.range_size for s in bufferblock_segs])
432 if write_total < self.size() or len(bufferblock_segs) > 1:
433 # If there's more than one segment referencing this block, it is
434 # due to out-of-order writes and will produce a fragmented
435 # manifest, so try to optimize by re-packing into a new buffer.
436 contents = self.buffer_view[0:self.write_pointer].tobytes()
437 new_bb = _BufferBlock(None, write_total, None)
438 for t in bufferblock_segs:
439 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
440 t.segment_offset = new_bb.size() - t.range_size
442 self.buffer_block = new_bb.buffer_block
443 self.buffer_view = new_bb.buffer_view
444 self.write_pointer = new_bb.write_pointer
447 self.owner.set_segments(segs)
450 return "<BufferBlock %s>" % (self.blockid)
453 class NoopLock(object):
457 def __exit__(self, exc_type, exc_value, traceback):
460 def acquire(self, blocking=False):
467 def must_be_writable(orig_func):
468 @functools.wraps(orig_func)
469 def must_be_writable_wrapper(self, *args, **kwargs):
470 if not self.writable():
471 raise IOError(errno.EROFS, "Collection is read-only.")
472 return orig_func(self, *args, **kwargs)
473 return must_be_writable_wrapper
476 class _BlockManager(object):
477 """BlockManager handles buffer blocks.
479 Also handles background block uploads, and background block prefetch for a
480 Collection of ArvadosFiles.
484 DEFAULT_PUT_THREADS = 2
485 DEFAULT_GET_THREADS = 2
487 def __init__(self, keep, copies=None, put_threads=None):
488 """keep: KeepClient object to use"""
490 self._bufferblocks = collections.OrderedDict()
491 self._put_queue = None
492 self._put_threads = None
493 self._prefetch_queue = None
494 self._prefetch_threads = None
495 self.lock = threading.Lock()
496 self.prefetch_enabled = True
498 self.num_put_threads = put_threads
500 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
501 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
503 self._pending_write_size = 0
504 self.threads_lock = threading.Lock()
505 self.padding_block = None
508 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
509 """Allocate a new, empty bufferblock in WRITABLE state and return it.
512 optional block identifier, otherwise one will be automatically assigned
515 optional capacity, otherwise will use default capacity
518 ArvadosFile that owns this block
521 return self._alloc_bufferblock(blockid, starting_capacity, owner)
523 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
525 blockid = str(uuid.uuid4())
526 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
527 self._bufferblocks[bufferblock.blockid] = bufferblock
531 def dup_block(self, block, owner):
532 """Create a new bufferblock initialized with the content of an existing bufferblock.
535 the buffer block to copy.
538 ArvadosFile that owns the new block
541 new_blockid = str(uuid.uuid4())
542 bufferblock = block.clone(new_blockid, owner)
543 self._bufferblocks[bufferblock.blockid] = bufferblock
547 def is_bufferblock(self, locator):
548 return locator in self._bufferblocks
550 def _commit_bufferblock_worker(self):
551 """Background uploader thread."""
555 bufferblock = self._put_queue.get()
556 if bufferblock is None:
559 if self.copies is None:
560 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
562 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
563 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
564 except Exception as e:
565 bufferblock.set_state(_BufferBlock.ERROR, e)
567 if self._put_queue is not None:
568 self._put_queue.task_done()
570 def start_put_threads(self):
571 with self.threads_lock:
572 if self._put_threads is None:
573 # Start uploader threads.
575 # If we don't limit the Queue size, the upload queue can quickly
576 # grow to take up gigabytes of RAM if the writing process is
577 # generating data more quickly than it can be send to the Keep
580 # With two upload threads and a queue size of 2, this means up to 4
581 # blocks pending. If they are full 64 MiB blocks, that means up to
582 # 256 MiB of internal buffering, which is the same size as the
583 # default download block cache in KeepClient.
584 self._put_queue = queue.Queue(maxsize=2)
586 self._put_threads = []
587 for i in range(0, self.num_put_threads):
588 thread = threading.Thread(target=self._commit_bufferblock_worker)
589 self._put_threads.append(thread)
593 def _block_prefetch_worker(self):
594 """The background downloader thread."""
597 b = self._prefetch_queue.get()
602 _logger.exception("Exception doing block prefetch")
605 def start_get_threads(self):
606 if self._prefetch_threads is None:
607 self._prefetch_queue = queue.Queue()
608 self._prefetch_threads = []
609 for i in range(0, self.num_get_threads):
610 thread = threading.Thread(target=self._block_prefetch_worker)
611 self._prefetch_threads.append(thread)
617 def stop_threads(self):
618 """Shut down and wait for background upload and download threads to finish."""
620 if self._put_threads is not None:
621 for t in self._put_threads:
622 self._put_queue.put(None)
623 for t in self._put_threads:
625 self._put_threads = None
626 self._put_queue = None
628 if self._prefetch_threads is not None:
629 for t in self._prefetch_threads:
630 self._prefetch_queue.put(None)
631 for t in self._prefetch_threads:
633 self._prefetch_threads = None
634 self._prefetch_queue = None
639 def __exit__(self, exc_type, exc_value, traceback):
643 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
644 """Packs small blocks together before uploading"""
646 self._pending_write_size += closed_file_size
648 # Check if there are enough small blocks for filling up one in full
649 if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
652 # Search blocks ready for getting packed together before being
654 # A WRITABLE block always has an owner.
655 # A WRITABLE block with its owner.closed() implies that its
656 # size is <= KEEP_BLOCK_SIZE/2.
658 small_blocks = [b for b in listvalues(self._bufferblocks)
659 if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
660 except AttributeError:
661 # Writable blocks without owner shouldn't exist.
662 raise UnownedBlockError()
664 if len(small_blocks) <= 1:
665 # Not enough small blocks for repacking
668 for bb in small_blocks:
671 # Update the pending write size count with its true value, just in case
672 # some small file was opened, written and closed several times.
673 self._pending_write_size = sum([b.size() for b in small_blocks])
675 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
678 new_bb = self._alloc_bufferblock()
681 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
682 bb = small_blocks.pop(0)
683 new_bb.owner.append(bb.owner)
684 self._pending_write_size -= bb.size()
685 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
686 files.append((bb, new_bb.write_pointer - bb.size()))
688 self.commit_bufferblock(new_bb, sync=sync)
690 for bb, new_bb_segment_offset in files:
691 newsegs = bb.owner.segments()
693 if s.locator == bb.blockid:
694 s.locator = new_bb.blockid
695 s.segment_offset = new_bb_segment_offset+s.segment_offset
696 bb.owner.set_segments(newsegs)
697 self._delete_bufferblock(bb.blockid)
699 def commit_bufferblock(self, block, sync):
700 """Initiate a background upload of a bufferblock.
703 The block object to upload
706 If `sync` is True, upload the block synchronously.
707 If `sync` is False, upload the block asynchronously. This will
708 return immediately unless the upload queue is at capacity, in
709 which case it will wait on an upload queue slot.
713 # Mark the block as PENDING so to disallow any more appends.
714 block.set_state(_BufferBlock.PENDING)
715 except StateChangeError as e:
716 if e.state == _BufferBlock.PENDING:
718 block.wait_for_commit.wait()
721 if block.state() == _BufferBlock.COMMITTED:
723 elif block.state() == _BufferBlock.ERROR:
730 if self.copies is None:
731 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
733 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
734 block.set_state(_BufferBlock.COMMITTED, loc)
735 except Exception as e:
736 block.set_state(_BufferBlock.ERROR, e)
739 self.start_put_threads()
740 self._put_queue.put(block)
743 def get_bufferblock(self, locator):
744 return self._bufferblocks.get(locator)
747 def get_padding_block(self):
748 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
749 when using truncate() to extend the size of a file.
751 For reference (and possible future optimization), the md5sum of the
752 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
756 if self.padding_block is None:
757 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
758 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
759 self.commit_bufferblock(self.padding_block, False)
760 return self.padding_block
763 def delete_bufferblock(self, locator):
764 self._delete_bufferblock(locator)
766 def _delete_bufferblock(self, locator):
767 bb = self._bufferblocks[locator]
769 del self._bufferblocks[locator]
771 def get_block_contents(self, locator, num_retries, cache_only=False):
774 First checks to see if the locator is a BufferBlock and return that, if
775 not, passes the request through to KeepClient.get().
779 if locator in self._bufferblocks:
780 bufferblock = self._bufferblocks[locator]
781 if bufferblock.state() != _BufferBlock.COMMITTED:
782 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
784 locator = bufferblock._locator
786 return self._keep.get_from_cache(locator)
788 return self._keep.get(locator, num_retries=num_retries)
790 def commit_all(self):
791 """Commit all outstanding buffer blocks.
793 This is a synchronous call, and will not return until all buffer blocks
794 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
797 self.repack_small_blocks(force=True, sync=True)
800 items = listitems(self._bufferblocks)
803 if v.state() != _BufferBlock.COMMITTED and v.owner:
804 # Ignore blocks with a list of owners, as if they're not in COMMITTED
805 # state, they're already being committed asynchronously.
806 if isinstance(v.owner, ArvadosFile):
807 v.owner.flush(sync=False)
810 if self._put_queue is not None:
811 self._put_queue.join()
815 if v.state() == _BufferBlock.ERROR:
816 err.append((v.locator(), v.error))
818 raise KeepWriteError("Error writing some blocks", err, label="block")
821 # flush again with sync=True to remove committed bufferblocks from
824 if isinstance(v.owner, ArvadosFile):
825 v.owner.flush(sync=True)
826 elif isinstance(v.owner, list) and len(v.owner) > 0:
827 # This bufferblock is referenced by many files as a result
828 # of repacking small blocks, so don't delete it when flushing
829 # its owners, just do it after flushing them all.
830 for owner in v.owner:
831 owner.flush(sync=True)
832 self.delete_bufferblock(k)
834 def block_prefetch(self, locator):
835 """Initiate a background download of a block.
837 This assumes that the underlying KeepClient implements a block cache,
838 so repeated requests for the same block will not result in repeated
839 downloads (unless the block is evicted from the cache.) This method
844 if not self.prefetch_enabled:
847 if self._keep.get_from_cache(locator) is not None:
851 if locator in self._bufferblocks:
854 self.start_get_threads()
855 self._prefetch_queue.put(locator)
858 class ArvadosFile(object):
859 """Represent a file in a Collection.
861 ArvadosFile manages the underlying representation of a file in Keep as a
862 sequence of segments spanning a set of blocks, and implements random
865 This object may be accessed from multiple threads.
869 __slots__ = ('parent', 'name', '_writers', '_committed',
870 '_segments', 'lock', '_current_bblock', 'fuse_entry')
872 def __init__(self, parent, name, stream=[], segments=[]):
874 ArvadosFile constructor.
877 a list of Range objects representing a block stream
880 a list of Range objects representing segments
884 self._writers = set()
885 self._committed = False
887 self.lock = parent.root_collection().lock
889 self._add_segment(stream, s.locator, s.range_size)
890 self._current_bblock = None
893 return self.parent.writable()
896 def permission_expired(self, as_of_dt=None):
897 """Returns True if any of the segment's locators is expired"""
898 for r in self._segments:
899 if KeepLocator(r.locator).permission_expired(as_of_dt):
905 return copy.copy(self._segments)
908 def clone(self, new_parent, new_name):
909 """Make a copy of this file."""
910 cp = ArvadosFile(new_parent, new_name)
911 cp.replace_contents(self)
916 def replace_contents(self, other):
917 """Replace segments of this file with segments from another `ArvadosFile` object."""
921 for other_segment in other.segments():
922 new_loc = other_segment.locator
923 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
924 if other_segment.locator not in map_loc:
925 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
926 if bufferblock.state() != _BufferBlock.WRITABLE:
927 map_loc[other_segment.locator] = bufferblock.locator()
929 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
930 new_loc = map_loc[other_segment.locator]
932 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
934 self.set_committed(False)
936 def __eq__(self, other):
939 if not isinstance(other, ArvadosFile):
942 othersegs = other.segments()
944 if len(self._segments) != len(othersegs):
946 for i in range(0, len(othersegs)):
947 seg1 = self._segments[i]
952 if self.parent._my_block_manager().is_bufferblock(loc1):
953 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
955 if other.parent._my_block_manager().is_bufferblock(loc2):
956 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
958 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
959 seg1.range_start != seg2.range_start or
960 seg1.range_size != seg2.range_size or
961 seg1.segment_offset != seg2.segment_offset):
966 def __ne__(self, other):
967 return not self.__eq__(other)
970 def set_segments(self, segs):
971 self._segments = segs
974 def set_committed(self, value=True):
975 """Set committed flag.
977 If value is True, set committed to be True.
979 If value is False, set committed to be False for this and all parents.
981 if value == self._committed:
983 self._committed = value
984 if self._committed is False and self.parent is not None:
985 self.parent.set_committed(False)
989 """Get whether this is committed or not."""
990 return self._committed
993 def add_writer(self, writer):
994 """Add an ArvadosFileWriter reference to the list of writers"""
995 if isinstance(writer, ArvadosFileWriter):
996 self._writers.add(writer)
999 def remove_writer(self, writer, flush):
1001 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
1002 and do some block maintenance tasks.
1004 self._writers.remove(writer)
1006 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
1007 # File writer closed, not small enough for repacking
1010 # All writers closed and size is adequate for repacking
1011 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
1015 Get whether this is closed or not. When the writers list is empty, the file
1016 is supposed to be closed.
1018 return len(self._writers) == 0
1022 def truncate(self, size):
1023 """Shrink or expand the size of the file.
1025 If `size` is less than the size of the file, the file contents after
1026 `size` will be discarded. If `size` is greater than the current size
1027 of the file, it will be filled with zero bytes.
1030 if size < self.size():
1032 for r in self._segments:
1033 range_end = r.range_start+r.range_size
1034 if r.range_start >= size:
1035 # segment is past the trucate size, all done
1037 elif size < range_end:
1038 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1039 nr.segment_offset = r.segment_offset
1045 self._segments = new_segs
1046 self.set_committed(False)
1047 elif size > self.size():
1048 padding = self.parent._my_block_manager().get_padding_block()
1049 diff = size - self.size()
1050 while diff > config.KEEP_BLOCK_SIZE:
1051 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1052 diff -= config.KEEP_BLOCK_SIZE
1054 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1055 self.set_committed(False)
1057 # size == self.size()
1060 def readfrom(self, offset, size, num_retries, exact=False):
1061 """Read up to `size` bytes from the file starting at `offset`.
1064 If False (default), return less data than requested if the read
1065 crosses a block boundary and the next block isn't cached. If True,
1066 only return less data than requested when hitting EOF.
1070 if size == 0 or offset >= self.size():
1072 readsegs = locators_and_ranges(self._segments, offset, size)
1073 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
1078 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1080 blockview = memoryview(block)
1081 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1082 locs.add(lr.locator)
1087 if lr.locator not in locs:
1088 self.parent._my_block_manager().block_prefetch(lr.locator)
1089 locs.add(lr.locator)
1091 return b''.join(data)
1095 def writeto(self, offset, data, num_retries):
1096 """Write `data` to the file starting at `offset`.
1098 This will update existing bytes and/or extend the size of the file as
1102 if not isinstance(data, bytes) and not isinstance(data, memoryview):
1103 data = data.encode()
1107 if offset > self.size():
1108 self.truncate(offset)
1110 if len(data) > config.KEEP_BLOCK_SIZE:
1111 # Chunk it up into smaller writes
1113 dataview = memoryview(data)
1114 while n < len(data):
1115 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1116 n += config.KEEP_BLOCK_SIZE
1119 self.set_committed(False)
1121 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1122 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1124 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1125 self._current_bblock.repack_writes()
1126 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1127 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1128 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1130 self._current_bblock.append(data)
1132 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1134 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1139 def flush(self, sync=True, num_retries=0):
1140 """Flush the current bufferblock to Keep.
1143 If True, commit block synchronously, wait until buffer block has been written.
1144 If False, commit block asynchronously, return immediately after putting block into
1147 if self.committed():
1150 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1151 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1152 self._current_bblock.repack_writes()
1153 if self._current_bblock.state() != _BufferBlock.DELETED:
1154 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1158 for s in self._segments:
1159 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1161 if bb.state() != _BufferBlock.COMMITTED:
1162 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1163 to_delete.add(s.locator)
1164 s.locator = bb.locator()
1166 # Don't delete the bufferblock if it's owned by many files. It'll be
1167 # deleted after all of its owners are flush()ed.
1168 if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1169 self.parent._my_block_manager().delete_bufferblock(s)
1171 self.parent.notify(MOD, self.parent, self.name, (self, self))
1175 def add_segment(self, blocks, pos, size):
1176 """Add a segment to the end of the file.
1178 `pos` and `offset` reference a section of the stream described by
1179 `blocks` (a list of Range objects)
1182 self._add_segment(blocks, pos, size)
1184 def _add_segment(self, blocks, pos, size):
1185 """Internal implementation of add_segment."""
1186 self.set_committed(False)
1187 for lr in locators_and_ranges(blocks, pos, size):
1188 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1189 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1190 self._segments.append(r)
1194 """Get the file size."""
1196 n = self._segments[-1]
1197 return n.range_start + n.range_size
1202 def manifest_text(self, stream_name=".", portable_locators=False,
1203 normalize=False, only_committed=False):
1206 for segment in self._segments:
1207 loc = segment.locator
1208 if self.parent._my_block_manager().is_bufferblock(loc):
1211 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1212 if portable_locators:
1213 loc = KeepLocator(loc).stripped()
1214 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1215 segment.segment_offset, segment.range_size))
1216 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1222 def _reparent(self, newparent, newname):
1223 self.set_committed(False)
1224 self.flush(sync=True)
1225 self.parent.remove(self.name)
1226 self.parent = newparent
1228 self.lock = self.parent.root_collection().lock
1231 class ArvadosFileReader(ArvadosFileReaderBase):
1232 """Wraps ArvadosFile in a file-like object supporting reading only.
1234 Be aware that this class is NOT thread safe as there is no locking around
1235 updating file pointer.
1239 def __init__(self, arvadosfile, mode="r", num_retries=None):
1240 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1241 self.arvadosfile = arvadosfile
1244 return self.arvadosfile.size()
1246 def stream_name(self):
1247 return self.arvadosfile.parent.stream_name()
1249 @_FileLikeObjectBase._before_close
1251 def read(self, size=None, num_retries=None):
1252 """Read up to `size` bytes from the file and return the result.
1254 Starts at the current file position. If `size` is None, read the
1255 entire remainder of the file.
1259 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1262 self._filepos += len(rd)
1263 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1264 return b''.join(data)
1266 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1267 self._filepos += len(data)
1270 @_FileLikeObjectBase._before_close
1272 def readfrom(self, offset, size, num_retries=None):
1273 """Read up to `size` bytes from the stream, starting at the specified file offset.
1275 This method does not change the file position.
1277 return self.arvadosfile.readfrom(offset, size, num_retries)
1283 class ArvadosFileWriter(ArvadosFileReader):
1284 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1286 Be aware that this class is NOT thread safe as there is no locking around
1287 updating file pointer.
1291 def __init__(self, arvadosfile, mode, num_retries=None):
1292 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1293 self.arvadosfile.add_writer(self)
1298 @_FileLikeObjectBase._before_close
1300 def write(self, data, num_retries=None):
1301 if self.mode[0] == "a":
1302 self._filepos = self.size()
1303 self.arvadosfile.writeto(self._filepos, data, num_retries)
1304 self._filepos += len(data)
1307 @_FileLikeObjectBase._before_close
1309 def writelines(self, seq, num_retries=None):
1311 self.write(s, num_retries=num_retries)
1313 @_FileLikeObjectBase._before_close
1314 def truncate(self, size=None):
1316 size = self._filepos
1317 self.arvadosfile.truncate(size)
1319 @_FileLikeObjectBase._before_close
1321 self.arvadosfile.flush()
1323 def close(self, flush=True):
1325 self.arvadosfile.remove_writer(self, flush)
1326 super(ArvadosFileWriter, self).close()