1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 from future.utils import listitems, listvalues
5 standard_library.install_aliases()
6 from builtins import range
7 from builtins import object
24 from .errors import KeepWriteError, AssertionError, ArgumentError
25 from .keep import KeepLocator
26 from ._normalize_stream import normalize_stream
27 from ._ranges import locators_and_ranges, replace_range, Range
28 from .retry import retry_method
33 _logger = logging.getLogger('arvados.arvfile')
36 """split(path) -> streamname, filename
38 Separate the stream name and file name in a /-separated stream path and
39 return a tuple (stream_name, file_name). If no stream name is available,
44 stream_name, file_name = path.rsplit('/', 1)
45 except ValueError: # No / in string
46 stream_name, file_name = '.', path
47 return stream_name, file_name
50 class UnownedBlockError(Exception):
51 """Raised when there's an writable block without an owner on the BlockManager."""
55 class _FileLikeObjectBase(object):
56 def __init__(self, name, mode):
62 def _before_close(orig_func):
63 @functools.wraps(orig_func)
64 def before_close_wrapper(self, *args, **kwargs):
66 raise ValueError("I/O operation on closed stream file")
67 return orig_func(self, *args, **kwargs)
68 return before_close_wrapper
73 def __exit__(self, exc_type, exc_value, traceback):
84 class ArvadosFileReaderBase(_FileLikeObjectBase):
85 def __init__(self, name, mode, num_retries=None):
86 super(ArvadosFileReaderBase, self).__init__(name, mode)
87 self._binary = 'b' in mode
88 if sys.version_info >= (3, 0) and not self._binary:
89 raise NotImplementedError("text mode {!r} is not implemented".format(mode))
91 self.num_retries = num_retries
92 self._readline_cache = (None, None)
96 data = self.readline()
101 def decompressed_name(self):
102 return re.sub('\.(bz2|gz)$', '', self.name)
104 @_FileLikeObjectBase._before_close
105 def seek(self, pos, whence=os.SEEK_SET):
106 if whence == os.SEEK_CUR:
108 elif whence == os.SEEK_END:
111 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
127 @_FileLikeObjectBase._before_close
129 def readall(self, size=2**20, num_retries=None):
131 data = self.read(size, num_retries=num_retries)
136 @_FileLikeObjectBase._before_close
138 def readline(self, size=float('inf'), num_retries=None):
139 cache_pos, cache_data = self._readline_cache
140 if self.tell() == cache_pos:
142 self._filepos += len(cache_data)
145 data_size = len(data[-1])
146 while (data_size < size) and (b'\n' not in data[-1]):
147 next_read = self.read(2 ** 20, num_retries=num_retries)
150 data.append(next_read)
151 data_size += len(next_read)
152 data = b''.join(data)
154 nextline_index = data.index(b'\n') + 1
156 nextline_index = len(data)
157 nextline_index = min(nextline_index, size)
158 self._filepos -= len(data) - nextline_index
159 self._readline_cache = (self.tell(), data[nextline_index:])
160 return data[:nextline_index].decode()
162 @_FileLikeObjectBase._before_close
164 def decompress(self, decompress, size, num_retries=None):
165 for segment in self.readall(size, num_retries=num_retries):
166 data = decompress(segment)
170 @_FileLikeObjectBase._before_close
172 def readall_decompressed(self, size=2**20, num_retries=None):
174 if self.name.endswith('.bz2'):
175 dc = bz2.BZ2Decompressor()
176 return self.decompress(dc.decompress, size,
177 num_retries=num_retries)
178 elif self.name.endswith('.gz'):
179 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
180 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
181 size, num_retries=num_retries)
183 return self.readall(size, num_retries=num_retries)
185 @_FileLikeObjectBase._before_close
187 def readlines(self, sizehint=float('inf'), num_retries=None):
190 for s in self.readall(num_retries=num_retries):
193 if data_size >= sizehint:
195 return b''.join(data).decode().splitlines(True)
198 raise IOError(errno.ENOSYS, "Not implemented")
200 def read(self, size, num_retries=None):
201 raise IOError(errno.ENOSYS, "Not implemented")
203 def readfrom(self, start, size, num_retries=None):
204 raise IOError(errno.ENOSYS, "Not implemented")
207 class StreamFileReader(ArvadosFileReaderBase):
208 class _NameAttribute(str):
209 # The Python file API provides a plain .name attribute.
210 # Older SDK provided a name() method.
211 # This class provides both, for maximum compatibility.
215 def __init__(self, stream, segments, name):
216 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
217 self._stream = stream
218 self.segments = segments
220 def stream_name(self):
221 return self._stream.name()
224 n = self.segments[-1]
225 return n.range_start + n.range_size
227 @_FileLikeObjectBase._before_close
229 def read(self, size, num_retries=None):
230 """Read up to 'size' bytes from the stream, starting at the current file position"""
235 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
237 lr = available_chunks[0]
238 data = self._stream.readfrom(lr.locator+lr.segment_offset,
240 num_retries=num_retries)
242 self._filepos += len(data)
245 @_FileLikeObjectBase._before_close
247 def readfrom(self, start, size, num_retries=None):
248 """Read up to 'size' bytes from the stream, starting at 'start'"""
253 for lr in locators_and_ranges(self.segments, start, size):
254 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
255 num_retries=num_retries))
256 return b''.join(data)
258 def as_manifest(self):
260 for r in self.segments:
261 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
262 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
265 def synchronized(orig_func):
266 @functools.wraps(orig_func)
267 def synchronized_wrapper(self, *args, **kwargs):
269 return orig_func(self, *args, **kwargs)
270 return synchronized_wrapper
273 class StateChangeError(Exception):
274 def __init__(self, message, state, nextstate):
275 super(StateChangeError, self).__init__(message)
277 self.nextstate = nextstate
279 class _BufferBlock(object):
280 """A stand-in for a Keep block that is in the process of being written.
282 Writers can append to it, get the size, and compute the Keep locator.
283 There are three valid states:
289 Block is in the process of being uploaded to Keep, append is an error.
292 The block has been written to Keep, its internal buffer has been
293 released, fetching the block will fetch it via keep client (since we
294 discarded the internal copy), and identifiers referring to the BufferBlock
295 can be replaced with the block locator.
304 def __init__(self, blockid, starting_capacity, owner):
307 the identifier for this block
310 the initial buffer capacity
313 ArvadosFile that owns this block
316 self.blockid = blockid
317 self.buffer_block = bytearray(starting_capacity)
318 self.buffer_view = memoryview(self.buffer_block)
319 self.write_pointer = 0
320 self._state = _BufferBlock.WRITABLE
323 self.lock = threading.Lock()
324 self.wait_for_commit = threading.Event()
328 def append(self, data):
329 """Append some data to the buffer.
331 Only valid if the block is in WRITABLE state. Implements an expanding
332 buffer, doubling capacity as needed to accomdate all the data.
335 if self._state == _BufferBlock.WRITABLE:
336 if not isinstance(data, bytes) and not isinstance(data, memoryview):
338 while (self.write_pointer+len(data)) > len(self.buffer_block):
339 new_buffer_block = bytearray(len(self.buffer_block) * 2)
340 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
341 self.buffer_block = new_buffer_block
342 self.buffer_view = memoryview(self.buffer_block)
343 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
344 self.write_pointer += len(data)
347 raise AssertionError("Buffer block is not writable")
349 STATE_TRANSITIONS = frozenset([
351 (PENDING, COMMITTED),
356 def set_state(self, nextstate, val=None):
357 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
358 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
359 self._state = nextstate
361 if self._state == _BufferBlock.PENDING:
362 self.wait_for_commit.clear()
364 if self._state == _BufferBlock.COMMITTED:
366 self.buffer_view = None
367 self.buffer_block = None
368 self.wait_for_commit.set()
370 if self._state == _BufferBlock.ERROR:
372 self.wait_for_commit.set()
379 """The amount of data written to the buffer."""
380 return self.write_pointer
384 """The Keep locator for this buffer's contents."""
385 if self._locator is None:
386 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
390 def clone(self, new_blockid, owner):
391 if self._state == _BufferBlock.COMMITTED:
392 raise AssertionError("Cannot duplicate committed buffer block")
393 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
394 bufferblock.append(self.buffer_view[0:self.size()])
400 self.buffer_block = None
401 self.buffer_view = None
404 class NoopLock(object):
408 def __exit__(self, exc_type, exc_value, traceback):
411 def acquire(self, blocking=False):
418 def must_be_writable(orig_func):
419 @functools.wraps(orig_func)
420 def must_be_writable_wrapper(self, *args, **kwargs):
421 if not self.writable():
422 raise IOError(errno.EROFS, "Collection is read-only.")
423 return orig_func(self, *args, **kwargs)
424 return must_be_writable_wrapper
427 class _BlockManager(object):
428 """BlockManager handles buffer blocks.
430 Also handles background block uploads, and background block prefetch for a
431 Collection of ArvadosFiles.
435 DEFAULT_PUT_THREADS = 2
436 DEFAULT_GET_THREADS = 2
438 def __init__(self, keep, copies=None, put_threads=None):
439 """keep: KeepClient object to use"""
441 self._bufferblocks = collections.OrderedDict()
442 self._put_queue = None
443 self._put_threads = None
444 self._prefetch_queue = None
445 self._prefetch_threads = None
446 self.lock = threading.Lock()
447 self.prefetch_enabled = True
449 self.num_put_threads = put_threads
451 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
452 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
454 self._pending_write_size = 0
455 self.threads_lock = threading.Lock()
456 self.padding_block = None
459 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
460 """Allocate a new, empty bufferblock in WRITABLE state and return it.
463 optional block identifier, otherwise one will be automatically assigned
466 optional capacity, otherwise will use default capacity
469 ArvadosFile that owns this block
472 return self._alloc_bufferblock(blockid, starting_capacity, owner)
474 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
476 blockid = "%s" % uuid.uuid4()
477 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
478 self._bufferblocks[bufferblock.blockid] = bufferblock
482 def dup_block(self, block, owner):
483 """Create a new bufferblock initialized with the content of an existing bufferblock.
486 the buffer block to copy.
489 ArvadosFile that owns the new block
492 new_blockid = "bufferblock%i" % len(self._bufferblocks)
493 bufferblock = block.clone(new_blockid, owner)
494 self._bufferblocks[bufferblock.blockid] = bufferblock
498 def is_bufferblock(self, locator):
499 return locator in self._bufferblocks
501 def _commit_bufferblock_worker(self):
502 """Background uploader thread."""
506 bufferblock = self._put_queue.get()
507 if bufferblock is None:
510 if self.copies is None:
511 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
513 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
514 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
516 except Exception as e:
517 bufferblock.set_state(_BufferBlock.ERROR, e)
519 if self._put_queue is not None:
520 self._put_queue.task_done()
522 def start_put_threads(self):
523 with self.threads_lock:
524 if self._put_threads is None:
525 # Start uploader threads.
527 # If we don't limit the Queue size, the upload queue can quickly
528 # grow to take up gigabytes of RAM if the writing process is
529 # generating data more quickly than it can be send to the Keep
532 # With two upload threads and a queue size of 2, this means up to 4
533 # blocks pending. If they are full 64 MiB blocks, that means up to
534 # 256 MiB of internal buffering, which is the same size as the
535 # default download block cache in KeepClient.
536 self._put_queue = queue.Queue(maxsize=2)
538 self._put_threads = []
539 for i in range(0, self.num_put_threads):
540 thread = threading.Thread(target=self._commit_bufferblock_worker)
541 self._put_threads.append(thread)
545 def _block_prefetch_worker(self):
546 """The background downloader thread."""
549 b = self._prefetch_queue.get()
554 _logger.exception("Exception doing block prefetch")
557 def start_get_threads(self):
558 if self._prefetch_threads is None:
559 self._prefetch_queue = queue.Queue()
560 self._prefetch_threads = []
561 for i in range(0, self.num_get_threads):
562 thread = threading.Thread(target=self._block_prefetch_worker)
563 self._prefetch_threads.append(thread)
569 def stop_threads(self):
570 """Shut down and wait for background upload and download threads to finish."""
572 if self._put_threads is not None:
573 for t in self._put_threads:
574 self._put_queue.put(None)
575 for t in self._put_threads:
577 self._put_threads = None
578 self._put_queue = None
580 if self._prefetch_threads is not None:
581 for t in self._prefetch_threads:
582 self._prefetch_queue.put(None)
583 for t in self._prefetch_threads:
585 self._prefetch_threads = None
586 self._prefetch_queue = None
591 def __exit__(self, exc_type, exc_value, traceback):
595 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
596 """Packs small blocks together before uploading"""
597 self._pending_write_size += closed_file_size
599 # Check if there are enough small blocks for filling up one in full
600 if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
602 # Search blocks ready for getting packed together before being committed to Keep.
603 # A WRITABLE block always has an owner.
604 # A WRITABLE block with its owner.closed() implies that it's
605 # size is <= KEEP_BLOCK_SIZE/2.
607 small_blocks = [b for b in listvalues(self._bufferblocks) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
608 except AttributeError:
609 # Writable blocks without owner shouldn't exist.
610 raise UnownedBlockError()
612 if len(small_blocks) <= 1:
613 # Not enough small blocks for repacking
616 # Update the pending write size count with its true value, just in case
617 # some small file was opened, written and closed several times.
618 self._pending_write_size = sum([b.size() for b in small_blocks])
619 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
622 new_bb = self._alloc_bufferblock()
623 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
624 bb = small_blocks.pop(0)
626 self._pending_write_size -= bb.size()
627 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
628 arvfile.set_segments([Range(new_bb.blockid,
631 new_bb.write_pointer - bb.size())])
632 self._delete_bufferblock(bb.blockid)
633 self.commit_bufferblock(new_bb, sync=sync)
635 def commit_bufferblock(self, block, sync):
636 """Initiate a background upload of a bufferblock.
639 The block object to upload
642 If `sync` is True, upload the block synchronously.
643 If `sync` is False, upload the block asynchronously. This will
644 return immediately unless the upload queue is at capacity, in
645 which case it will wait on an upload queue slot.
649 # Mark the block as PENDING so to disallow any more appends.
650 block.set_state(_BufferBlock.PENDING)
651 except StateChangeError as e:
652 if e.state == _BufferBlock.PENDING:
654 block.wait_for_commit.wait()
657 if block.state() == _BufferBlock.COMMITTED:
659 elif block.state() == _BufferBlock.ERROR:
666 if self.copies is None:
667 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
669 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
670 block.set_state(_BufferBlock.COMMITTED, loc)
671 except Exception as e:
672 block.set_state(_BufferBlock.ERROR, e)
675 self.start_put_threads()
676 self._put_queue.put(block)
679 def get_bufferblock(self, locator):
680 return self._bufferblocks.get(locator)
683 def get_padding_block(self):
684 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
685 when using truncate() to extend the size of a file.
687 For reference (and possible future optimization), the md5sum of the
688 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
692 if self.padding_block is None:
693 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
694 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
695 self.commit_bufferblock(self.padding_block, False)
696 return self.padding_block
699 def delete_bufferblock(self, locator):
700 self._delete_bufferblock(locator)
702 def _delete_bufferblock(self, locator):
703 bb = self._bufferblocks[locator]
705 del self._bufferblocks[locator]
707 def get_block_contents(self, locator, num_retries, cache_only=False):
710 First checks to see if the locator is a BufferBlock and return that, if
711 not, passes the request through to KeepClient.get().
715 if locator in self._bufferblocks:
716 bufferblock = self._bufferblocks[locator]
717 if bufferblock.state() != _BufferBlock.COMMITTED:
718 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
720 locator = bufferblock._locator
722 return self._keep.get_from_cache(locator)
724 return self._keep.get(locator, num_retries=num_retries)
726 def commit_all(self):
727 """Commit all outstanding buffer blocks.
729 This is a synchronous call, and will not return until all buffer blocks
730 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
733 self.repack_small_blocks(force=True, sync=True)
736 items = listitems(self._bufferblocks)
739 if v.state() != _BufferBlock.COMMITTED and v.owner:
740 v.owner.flush(sync=False)
743 if self._put_queue is not None:
744 self._put_queue.join()
748 if v.state() == _BufferBlock.ERROR:
749 err.append((v.locator(), v.error))
751 raise KeepWriteError("Error writing some blocks", err, label="block")
754 # flush again with sync=True to remove committed bufferblocks from
757 v.owner.flush(sync=True)
759 def block_prefetch(self, locator):
760 """Initiate a background download of a block.
762 This assumes that the underlying KeepClient implements a block cache,
763 so repeated requests for the same block will not result in repeated
764 downloads (unless the block is evicted from the cache.) This method
769 if not self.prefetch_enabled:
772 if self._keep.get_from_cache(locator) is not None:
776 if locator in self._bufferblocks:
779 self.start_get_threads()
780 self._prefetch_queue.put(locator)
783 class ArvadosFile(object):
784 """Represent a file in a Collection.
786 ArvadosFile manages the underlying representation of a file in Keep as a
787 sequence of segments spanning a set of blocks, and implements random
790 This object may be accessed from multiple threads.
794 def __init__(self, parent, name, stream=[], segments=[]):
796 ArvadosFile constructor.
799 a list of Range objects representing a block stream
802 a list of Range objects representing segments
806 self._writers = set()
807 self._committed = False
809 self.lock = parent.root_collection().lock
811 self._add_segment(stream, s.locator, s.range_size)
812 self._current_bblock = None
815 return self.parent.writable()
818 def permission_expired(self, as_of_dt=None):
819 """Returns True if any of the segment's locators is expired"""
820 for r in self._segments:
821 if KeepLocator(r.locator).permission_expired(as_of_dt):
827 return copy.copy(self._segments)
830 def clone(self, new_parent, new_name):
831 """Make a copy of this file."""
832 cp = ArvadosFile(new_parent, new_name)
833 cp.replace_contents(self)
838 def replace_contents(self, other):
839 """Replace segments of this file with segments from another `ArvadosFile` object."""
843 for other_segment in other.segments():
844 new_loc = other_segment.locator
845 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
846 if other_segment.locator not in map_loc:
847 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
848 if bufferblock.state() != _BufferBlock.WRITABLE:
849 map_loc[other_segment.locator] = bufferblock.locator()
851 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
852 new_loc = map_loc[other_segment.locator]
854 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
856 self.set_committed(False)
858 def __eq__(self, other):
861 if not isinstance(other, ArvadosFile):
864 othersegs = other.segments()
866 if len(self._segments) != len(othersegs):
868 for i in range(0, len(othersegs)):
869 seg1 = self._segments[i]
874 if self.parent._my_block_manager().is_bufferblock(loc1):
875 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
877 if other.parent._my_block_manager().is_bufferblock(loc2):
878 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
880 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
881 seg1.range_start != seg2.range_start or
882 seg1.range_size != seg2.range_size or
883 seg1.segment_offset != seg2.segment_offset):
888 def __ne__(self, other):
889 return not self.__eq__(other)
892 def set_segments(self, segs):
893 self._segments = segs
896 def set_committed(self, value=True):
897 """Set committed flag.
899 If value is True, set committed to be True.
901 If value is False, set committed to be False for this and all parents.
903 if value == self._committed:
905 self._committed = value
906 if self._committed is False and self.parent is not None:
907 self.parent.set_committed(False)
911 """Get whether this is committed or not."""
912 return self._committed
915 def add_writer(self, writer):
916 """Add an ArvadosFileWriter reference to the list of writers"""
917 if isinstance(writer, ArvadosFileWriter):
918 self._writers.add(writer)
921 def remove_writer(self, writer, flush):
923 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
924 and do some block maintenance tasks.
926 self._writers.remove(writer)
928 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
929 # File writer closed, not small enough for repacking
932 # All writers closed and size is adequate for repacking
933 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
937 Get whether this is closed or not. When the writers list is empty, the file
938 is supposed to be closed.
940 return len(self._writers) == 0
944 def truncate(self, size):
945 """Shrink or expand the size of the file.
947 If `size` is less than the size of the file, the file contents after
948 `size` will be discarded. If `size` is greater than the current size
949 of the file, it will be filled with zero bytes.
952 if size < self.size():
954 for r in self._segments:
955 range_end = r.range_start+r.range_size
956 if r.range_start >= size:
957 # segment is past the trucate size, all done
959 elif size < range_end:
960 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
961 nr.segment_offset = r.segment_offset
967 self._segments = new_segs
968 self.set_committed(False)
969 elif size > self.size():
970 padding = self.parent._my_block_manager().get_padding_block()
971 diff = size - self.size()
972 while diff > config.KEEP_BLOCK_SIZE:
973 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
974 diff -= config.KEEP_BLOCK_SIZE
976 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
977 self.set_committed(False)
979 # size == self.size()
982 def readfrom(self, offset, size, num_retries, exact=False):
983 """Read up to `size` bytes from the file starting at `offset`.
986 If False (default), return less data than requested if the read
987 crosses a block boundary and the next block isn't cached. If True,
988 only return less data than requested when hitting EOF.
992 if size == 0 or offset >= self.size():
994 readsegs = locators_and_ranges(self._segments, offset, size)
995 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
1000 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1002 blockview = memoryview(block)
1003 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1004 locs.add(lr.locator)
1009 if lr.locator not in locs:
1010 self.parent._my_block_manager().block_prefetch(lr.locator)
1011 locs.add(lr.locator)
1013 return b''.join(data)
1015 def _repack_writes(self, num_retries):
1016 """Optimize buffer block by repacking segments in file sequence.
1018 When the client makes random writes, they appear in the buffer block in
1019 the sequence they were written rather than the sequence they appear in
1020 the file. This makes for inefficient, fragmented manifests. Attempt
1021 to optimize by repacking writes in file sequence.
1024 segs = self._segments
1026 # Collect the segments that reference the buffer block.
1027 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
1029 # Collect total data referenced by segments (could be smaller than
1030 # bufferblock size if a portion of the file was written and
1031 # then overwritten).
1032 write_total = sum([s.range_size for s in bufferblock_segs])
1034 if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
1035 # If there's more than one segment referencing this block, it is
1036 # due to out-of-order writes and will produce a fragmented
1037 # manifest, so try to optimize by re-packing into a new buffer.
1038 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
1039 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
1040 for t in bufferblock_segs:
1041 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
1042 t.segment_offset = new_bb.size() - t.range_size
1044 self._current_bblock = new_bb
1048 def writeto(self, offset, data, num_retries):
1049 """Write `data` to the file starting at `offset`.
1051 This will update existing bytes and/or extend the size of the file as
1055 if not isinstance(data, bytes) and not isinstance(data, memoryview):
1056 data = data.encode()
1060 if offset > self.size():
1061 self.truncate(offset)
1063 if len(data) > config.KEEP_BLOCK_SIZE:
1064 # Chunk it up into smaller writes
1066 dataview = memoryview(data)
1067 while n < len(data):
1068 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1069 n += config.KEEP_BLOCK_SIZE
1072 self.set_committed(False)
1074 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1075 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1077 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1078 self._repack_writes(num_retries)
1079 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1080 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1081 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1083 self._current_bblock.append(data)
1085 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1087 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1092 def flush(self, sync=True, num_retries=0):
1093 """Flush the current bufferblock to Keep.
1096 If True, commit block synchronously, wait until buffer block has been written.
1097 If False, commit block asynchronously, return immediately after putting block into
1100 if self.committed():
1103 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1104 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1105 self._repack_writes(num_retries)
1106 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1110 for s in self._segments:
1111 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1113 if bb.state() != _BufferBlock.COMMITTED:
1114 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1115 to_delete.add(s.locator)
1116 s.locator = bb.locator()
1118 self.parent._my_block_manager().delete_bufferblock(s)
1120 self.parent.notify(MOD, self.parent, self.name, (self, self))
1124 def add_segment(self, blocks, pos, size):
1125 """Add a segment to the end of the file.
1127 `pos` and `offset` reference a section of the stream described by
1128 `blocks` (a list of Range objects)
1131 self._add_segment(blocks, pos, size)
1133 def _add_segment(self, blocks, pos, size):
1134 """Internal implementation of add_segment."""
1135 self.set_committed(False)
1136 for lr in locators_and_ranges(blocks, pos, size):
1137 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1138 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1139 self._segments.append(r)
1143 """Get the file size."""
1145 n = self._segments[-1]
1146 return n.range_start + n.range_size
1151 def manifest_text(self, stream_name=".", portable_locators=False,
1152 normalize=False, only_committed=False):
1155 for segment in self.segments:
1156 loc = segment.locator
1157 if self.parent._my_block_manager().is_bufferblock(loc):
1160 loc = self._bufferblocks[loc].calculate_locator()
1161 if portable_locators:
1162 loc = KeepLocator(loc).stripped()
1163 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1164 segment.segment_offset, segment.range_size))
1165 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1171 def _reparent(self, newparent, newname):
1172 self.set_committed(False)
1173 self.flush(sync=True)
1174 self.parent.remove(self.name)
1175 self.parent = newparent
1177 self.lock = self.parent.root_collection().lock
1180 class ArvadosFileReader(ArvadosFileReaderBase):
1181 """Wraps ArvadosFile in a file-like object supporting reading only.
1183 Be aware that this class is NOT thread safe as there is no locking around
1184 updating file pointer.
1188 def __init__(self, arvadosfile, mode="r", num_retries=None):
1189 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1190 self.arvadosfile = arvadosfile
1193 return self.arvadosfile.size()
1195 def stream_name(self):
1196 return self.arvadosfile.parent.stream_name()
1198 @_FileLikeObjectBase._before_close
1200 def read(self, size=None, num_retries=None):
1201 """Read up to `size` bytes from the file and return the result.
1203 Starts at the current file position. If `size` is None, read the
1204 entire remainder of the file.
1208 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1211 self._filepos += len(rd)
1212 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1213 return b''.join(data)
1215 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1216 self._filepos += len(data)
1219 @_FileLikeObjectBase._before_close
1221 def readfrom(self, offset, size, num_retries=None):
1222 """Read up to `size` bytes from the stream, starting at the specified file offset.
1224 This method does not change the file position.
1226 return self.arvadosfile.readfrom(offset, size, num_retries)
1232 class ArvadosFileWriter(ArvadosFileReader):
1233 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1235 Be aware that this class is NOT thread safe as there is no locking around
1236 updating file pointer.
1240 def __init__(self, arvadosfile, mode, num_retries=None):
1241 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1242 self.arvadosfile.add_writer(self)
1247 @_FileLikeObjectBase._before_close
1249 def write(self, data, num_retries=None):
1250 if self.mode[0] == "a":
1251 self.arvadosfile.writeto(self.size(), data, num_retries)
1253 self.arvadosfile.writeto(self._filepos, data, num_retries)
1254 self._filepos += len(data)
1257 @_FileLikeObjectBase._before_close
1259 def writelines(self, seq, num_retries=None):
1261 self.write(s, num_retries=num_retries)
1263 @_FileLikeObjectBase._before_close
1264 def truncate(self, size=None):
1266 size = self._filepos
1267 self.arvadosfile.truncate(size)
1269 @_FileLikeObjectBase._before_close
1271 self.arvadosfile.flush()
1273 def close(self, flush=True):
1275 self.arvadosfile.remove_writer(self, flush)
1276 super(ArvadosFileWriter, self).close()