16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
20 from .retry import retry_method
25 _logger = logging.getLogger('arvados.arvfile')
28 """split(path) -> streamname, filename
30 Separate the stream name and file name in a /-separated stream path and
31 return a tuple (stream_name, file_name). If no stream name is available,
36 stream_name, file_name = path.rsplit('/', 1)
37 except ValueError: # No / in string
38 stream_name, file_name = '.', path
39 return stream_name, file_name
42 class UnownedBlockError(Exception):
43 """Raised when there's an writable block without an owner on the BlockManager."""
47 class _FileLikeObjectBase(object):
48 def __init__(self, name, mode):
54 def _before_close(orig_func):
55 @functools.wraps(orig_func)
56 def before_close_wrapper(self, *args, **kwargs):
58 raise ValueError("I/O operation on closed stream file")
59 return orig_func(self, *args, **kwargs)
60 return before_close_wrapper
65 def __exit__(self, exc_type, exc_value, traceback):
76 class ArvadosFileReaderBase(_FileLikeObjectBase):
77 def __init__(self, name, mode, num_retries=None):
78 super(ArvadosFileReaderBase, self).__init__(name, mode)
80 self.num_retries = num_retries
81 self._readline_cache = (None, None)
85 data = self.readline()
90 def decompressed_name(self):
91 return re.sub('\.(bz2|gz)$', '', self.name)
93 @_FileLikeObjectBase._before_close
94 def seek(self, pos, whence=os.SEEK_SET):
95 if whence == os.SEEK_CUR:
97 elif whence == os.SEEK_END:
100 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
116 @_FileLikeObjectBase._before_close
118 def readall(self, size=2**20, num_retries=None):
120 data = self.read(size, num_retries=num_retries)
125 @_FileLikeObjectBase._before_close
127 def readline(self, size=float('inf'), num_retries=None):
128 cache_pos, cache_data = self._readline_cache
129 if self.tell() == cache_pos:
131 self._filepos += len(cache_data)
134 data_size = len(data[-1])
135 while (data_size < size) and ('\n' not in data[-1]):
136 next_read = self.read(2 ** 20, num_retries=num_retries)
139 data.append(next_read)
140 data_size += len(next_read)
143 nextline_index = data.index('\n') + 1
145 nextline_index = len(data)
146 nextline_index = min(nextline_index, size)
147 self._filepos -= len(data) - nextline_index
148 self._readline_cache = (self.tell(), data[nextline_index:])
149 return data[:nextline_index]
151 @_FileLikeObjectBase._before_close
153 def decompress(self, decompress, size, num_retries=None):
154 for segment in self.readall(size, num_retries=num_retries):
155 data = decompress(segment)
159 @_FileLikeObjectBase._before_close
161 def readall_decompressed(self, size=2**20, num_retries=None):
163 if self.name.endswith('.bz2'):
164 dc = bz2.BZ2Decompressor()
165 return self.decompress(dc.decompress, size,
166 num_retries=num_retries)
167 elif self.name.endswith('.gz'):
168 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
169 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
170 size, num_retries=num_retries)
172 return self.readall(size, num_retries=num_retries)
174 @_FileLikeObjectBase._before_close
176 def readlines(self, sizehint=float('inf'), num_retries=None):
179 for s in self.readall(num_retries=num_retries):
182 if data_size >= sizehint:
184 return ''.join(data).splitlines(True)
187 raise IOError(errno.ENOSYS, "Not implemented")
189 def read(self, size, num_retries=None):
190 raise IOError(errno.ENOSYS, "Not implemented")
192 def readfrom(self, start, size, num_retries=None):
193 raise IOError(errno.ENOSYS, "Not implemented")
196 class StreamFileReader(ArvadosFileReaderBase):
197 class _NameAttribute(str):
198 # The Python file API provides a plain .name attribute.
199 # Older SDK provided a name() method.
200 # This class provides both, for maximum compatibility.
204 def __init__(self, stream, segments, name):
205 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
206 self._stream = stream
207 self.segments = segments
209 def stream_name(self):
210 return self._stream.name()
213 n = self.segments[-1]
214 return n.range_start + n.range_size
216 @_FileLikeObjectBase._before_close
218 def read(self, size, num_retries=None):
219 """Read up to 'size' bytes from the stream, starting at the current file position"""
224 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
226 lr = available_chunks[0]
227 data = self._stream.readfrom(lr.locator+lr.segment_offset,
229 num_retries=num_retries)
231 self._filepos += len(data)
234 @_FileLikeObjectBase._before_close
236 def readfrom(self, start, size, num_retries=None):
237 """Read up to 'size' bytes from the stream, starting at 'start'"""
242 for lr in locators_and_ranges(self.segments, start, size):
243 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
244 num_retries=num_retries))
247 def as_manifest(self):
249 for r in self.segments:
250 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
251 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
254 def synchronized(orig_func):
255 @functools.wraps(orig_func)
256 def synchronized_wrapper(self, *args, **kwargs):
258 return orig_func(self, *args, **kwargs)
259 return synchronized_wrapper
262 class StateChangeError(Exception):
263 def __init__(self, message, state, nextstate):
264 super(StateChangeError, self).__init__(message)
266 self.nextstate = nextstate
268 class _BufferBlock(object):
269 """A stand-in for a Keep block that is in the process of being written.
271 Writers can append to it, get the size, and compute the Keep locator.
272 There are three valid states:
278 Block is in the process of being uploaded to Keep, append is an error.
281 The block has been written to Keep, its internal buffer has been
282 released, fetching the block will fetch it via keep client (since we
283 discarded the internal copy), and identifiers referring to the BufferBlock
284 can be replaced with the block locator.
294 def __init__(self, blockid, starting_capacity, owner):
297 the identifier for this block
300 the initial buffer capacity
303 ArvadosFile that owns this block
306 self.blockid = blockid
307 self.buffer_block = bytearray(starting_capacity)
308 self.buffer_view = memoryview(self.buffer_block)
309 self.write_pointer = 0
310 self._state = _BufferBlock.WRITABLE
313 self.lock = threading.Lock()
314 self.wait_for_commit = threading.Event()
318 def append(self, data):
319 """Append some data to the buffer.
321 Only valid if the block is in WRITABLE state. Implements an expanding
322 buffer, doubling capacity as needed to accomdate all the data.
325 if self._state == _BufferBlock.WRITABLE:
326 while (self.write_pointer+len(data)) > len(self.buffer_block):
327 new_buffer_block = bytearray(len(self.buffer_block) * 2)
328 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
329 self.buffer_block = new_buffer_block
330 self.buffer_view = memoryview(self.buffer_block)
331 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
332 self.write_pointer += len(data)
335 raise AssertionError("Buffer block is not writable")
337 STATE_TRANSITIONS = frozenset([
339 (PENDING, COMMITTED),
344 def set_state(self, nextstate, val=None):
345 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
346 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
347 self._state = nextstate
349 if self._state == _BufferBlock.PENDING:
350 self.wait_for_commit.clear()
352 if self._state == _BufferBlock.COMMITTED:
354 self.buffer_view = None
355 self.buffer_block = None
356 self.wait_for_commit.set()
358 if self._state == _BufferBlock.ERROR:
360 self.wait_for_commit.set()
367 """The amount of data written to the buffer."""
368 return self.write_pointer
372 """The Keep locator for this buffer's contents."""
373 if self._locator is None:
374 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
378 def clone(self, new_blockid, owner):
379 if self._state == _BufferBlock.COMMITTED:
380 raise AssertionError("Cannot duplicate committed buffer block")
381 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
382 bufferblock.append(self.buffer_view[0:self.size()])
387 self._state = _BufferBlock.DELETED
389 self.buffer_block = None
390 self.buffer_view = None
393 return "<BufferBlock %s>" % (self.blockid)
396 class NoopLock(object):
400 def __exit__(self, exc_type, exc_value, traceback):
403 def acquire(self, blocking=False):
410 def must_be_writable(orig_func):
411 @functools.wraps(orig_func)
412 def must_be_writable_wrapper(self, *args, **kwargs):
413 if not self.writable():
414 raise IOError(errno.EROFS, "Collection is read-only.")
415 return orig_func(self, *args, **kwargs)
416 return must_be_writable_wrapper
419 class _BlockManager(object):
420 """BlockManager handles buffer blocks.
422 Also handles background block uploads, and background block prefetch for a
423 Collection of ArvadosFiles.
427 DEFAULT_PUT_THREADS = 2
428 DEFAULT_GET_THREADS = 2
430 def __init__(self, keep, copies=None, put_threads=None):
431 """keep: KeepClient object to use"""
433 self._bufferblocks = collections.OrderedDict()
434 self._put_queue = None
435 self._put_threads = None
436 self._prefetch_queue = None
437 self._prefetch_threads = None
438 self.lock = threading.Lock()
439 self.prefetch_enabled = True
441 self.num_put_threads = put_threads
443 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
444 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
446 self._pending_write_size = 0
447 self.threads_lock = threading.Lock()
448 self.padding_block = None
451 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
452 """Allocate a new, empty bufferblock in WRITABLE state and return it.
455 optional block identifier, otherwise one will be automatically assigned
458 optional capacity, otherwise will use default capacity
461 ArvadosFile that owns this block
464 return self._alloc_bufferblock(blockid, starting_capacity, owner)
466 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
468 blockid = str(uuid.uuid4())
469 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
470 self._bufferblocks[bufferblock.blockid] = bufferblock
474 def dup_block(self, block, owner):
475 """Create a new bufferblock initialized with the content of an existing bufferblock.
478 the buffer block to copy.
481 ArvadosFile that owns the new block
484 new_blockid = str(uuid.uuid4())
485 bufferblock = block.clone(new_blockid, owner)
486 self._bufferblocks[bufferblock.blockid] = bufferblock
490 def is_bufferblock(self, locator):
491 return locator in self._bufferblocks
493 def _commit_bufferblock_worker(self):
494 """Background uploader thread."""
498 bufferblock = self._put_queue.get()
499 if bufferblock is None:
502 if self.copies is None:
503 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
505 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
506 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
508 except Exception as e:
509 bufferblock.set_state(_BufferBlock.ERROR, e)
511 if self._put_queue is not None:
512 self._put_queue.task_done()
514 def start_put_threads(self):
515 with self.threads_lock:
516 if self._put_threads is None:
517 # Start uploader threads.
519 # If we don't limit the Queue size, the upload queue can quickly
520 # grow to take up gigabytes of RAM if the writing process is
521 # generating data more quickly than it can be send to the Keep
524 # With two upload threads and a queue size of 2, this means up to 4
525 # blocks pending. If they are full 64 MiB blocks, that means up to
526 # 256 MiB of internal buffering, which is the same size as the
527 # default download block cache in KeepClient.
528 self._put_queue = Queue.Queue(maxsize=2)
530 self._put_threads = []
531 for i in xrange(0, self.num_put_threads):
532 thread = threading.Thread(target=self._commit_bufferblock_worker)
533 self._put_threads.append(thread)
537 def _block_prefetch_worker(self):
538 """The background downloader thread."""
541 b = self._prefetch_queue.get()
546 _logger.exception("Exception doing block prefetch")
549 def start_get_threads(self):
550 if self._prefetch_threads is None:
551 self._prefetch_queue = Queue.Queue()
552 self._prefetch_threads = []
553 for i in xrange(0, self.num_get_threads):
554 thread = threading.Thread(target=self._block_prefetch_worker)
555 self._prefetch_threads.append(thread)
561 def stop_threads(self):
562 """Shut down and wait for background upload and download threads to finish."""
564 if self._put_threads is not None:
565 for t in self._put_threads:
566 self._put_queue.put(None)
567 for t in self._put_threads:
569 self._put_threads = None
570 self._put_queue = None
572 if self._prefetch_threads is not None:
573 for t in self._prefetch_threads:
574 self._prefetch_queue.put(None)
575 for t in self._prefetch_threads:
577 self._prefetch_threads = None
578 self._prefetch_queue = None
583 def __exit__(self, exc_type, exc_value, traceback):
586 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
587 """Packs small blocks together before uploading"""
588 self._pending_write_size += closed_file_size
590 # Check if there are enough small blocks for filling up one in full
591 if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
594 # Search blocks ready for getting packed together before being committed to Keep.
595 # A WRITABLE block always has an owner.
596 # A WRITABLE block with its owner.closed() implies that it's
597 # size is <= KEEP_BLOCK_SIZE/2.
599 bufferblocks = self._bufferblocks.values()
602 for b in bufferblocks:
603 if b.state() == _BufferBlock.WRITABLE and b.owner.closed():
604 b.owner._repack_writes(0)
605 except AttributeError:
606 # Writable blocks without owner shouldn't exist.
607 raise UnownedBlockError()
610 small_blocks = [b for b in self._bufferblocks.values()
611 if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
613 if len(small_blocks) <= 1:
614 # Not enough small blocks for repacking
617 # Update the pending write size count with its true value, just in case
618 # some small file was opened, written and closed several times.
619 self._pending_write_size = sum([b.size() for b in small_blocks])
620 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
623 new_bb = self._alloc_bufferblock()
625 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
626 bb = small_blocks.pop(0)
627 self._pending_write_size -= bb.size()
628 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
629 files.append((bb, new_bb.write_pointer - bb.size()))
631 self.commit_bufferblock(new_bb, sync=sync)
633 for bb, new_bb_segment_offset in files:
635 for s in bb.owner.segments():
636 if s.locator == bb.blockid:
637 newsegs.append(Range(new_bb.locator(), s.range_start, s.range_size, new_bb_segment_offset+s.segment_offset))
640 bb.owner.set_segments(newsegs)
641 self._delete_bufferblock(bb.blockid)
643 def commit_bufferblock(self, block, sync):
644 """Initiate a background upload of a bufferblock.
647 The block object to upload
650 If `sync` is True, upload the block synchronously.
651 If `sync` is False, upload the block asynchronously. This will
652 return immediately unless the upload queue is at capacity, in
653 which case it will wait on an upload queue slot.
657 # Mark the block as PENDING so to disallow any more appends.
658 block.set_state(_BufferBlock.PENDING)
659 except StateChangeError as e:
660 if e.state == _BufferBlock.PENDING:
662 block.wait_for_commit.wait()
665 if block.state() == _BufferBlock.COMMITTED:
667 elif block.state() == _BufferBlock.ERROR:
674 if self.copies is None:
675 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
677 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
678 block.set_state(_BufferBlock.COMMITTED, loc)
679 except Exception as e:
680 block.set_state(_BufferBlock.ERROR, e)
683 self.start_put_threads()
684 self._put_queue.put(block)
687 def get_bufferblock(self, locator):
688 return self._bufferblocks.get(locator)
691 def get_padding_block(self):
692 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
693 when using truncate() to extend the size of a file.
695 For reference (and possible future optimization), the md5sum of the
696 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
700 if self.padding_block is None:
701 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
702 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
703 self.commit_bufferblock(self.padding_block, False)
704 return self.padding_block
707 def delete_bufferblock(self, locator):
708 self._delete_bufferblock(locator)
710 def _delete_bufferblock(self, locator):
711 bb = self._bufferblocks[locator]
713 del self._bufferblocks[locator]
715 def get_block_contents(self, locator, num_retries, cache_only=False):
718 First checks to see if the locator is a BufferBlock and return that, if
719 not, passes the request through to KeepClient.get().
723 if locator in self._bufferblocks:
724 bufferblock = self._bufferblocks[locator]
725 if bufferblock.state() != _BufferBlock.COMMITTED:
726 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
728 locator = bufferblock._locator
730 return self._keep.get_from_cache(locator)
732 return self._keep.get(locator, num_retries=num_retries)
734 def commit_all(self):
735 """Commit all outstanding buffer blocks.
737 This is a synchronous call, and will not return until all buffer blocks
738 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
741 self.repack_small_blocks(force=True, sync=True)
744 items = self._bufferblocks.items()
747 if v.state() != _BufferBlock.COMMITTED and v.owner:
748 v.owner.flush(sync=False)
751 if self._put_queue is not None:
752 self._put_queue.join()
756 if v.state() == _BufferBlock.ERROR:
757 err.append((v.locator(), v.error))
759 raise KeepWriteError("Error writing some blocks", err, label="block")
762 # flush again with sync=True to remove committed bufferblocks from
765 v.owner.flush(sync=True)
767 def block_prefetch(self, locator):
768 """Initiate a background download of a block.
770 This assumes that the underlying KeepClient implements a block cache,
771 so repeated requests for the same block will not result in repeated
772 downloads (unless the block is evicted from the cache.) This method
777 if not self.prefetch_enabled:
780 if self._keep.get_from_cache(locator) is not None:
784 if locator in self._bufferblocks:
787 self.start_get_threads()
788 self._prefetch_queue.put(locator)
791 class ArvadosFile(object):
792 """Represent a file in a Collection.
794 ArvadosFile manages the underlying representation of a file in Keep as a
795 sequence of segments spanning a set of blocks, and implements random
798 This object may be accessed from multiple threads.
802 def __init__(self, parent, name, stream=[], segments=[]):
804 ArvadosFile constructor.
807 a list of Range objects representing a block stream
810 a list of Range objects representing segments
814 self._writers = set()
815 self._committed = False
817 self.lock = parent.root_collection().lock
819 self._add_segment(stream, s.locator, s.range_size)
820 self._current_bblock = None
823 return self.parent.writable()
826 def permission_expired(self, as_of_dt=None):
827 """Returns True if any of the segment's locators is expired"""
828 for r in self._segments:
829 if KeepLocator(r.locator).permission_expired(as_of_dt):
835 return copy.copy(self._segments)
838 def clone(self, new_parent, new_name):
839 """Make a copy of this file."""
840 cp = ArvadosFile(new_parent, new_name)
841 cp.replace_contents(self)
846 def replace_contents(self, other):
847 """Replace segments of this file with segments from another `ArvadosFile` object."""
851 for other_segment in other.segments():
852 new_loc = other_segment.locator
853 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
854 if other_segment.locator not in map_loc:
855 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
856 if bufferblock.state() != _BufferBlock.WRITABLE:
857 map_loc[other_segment.locator] = bufferblock.locator()
859 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
860 new_loc = map_loc[other_segment.locator]
862 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
864 self.set_committed(False)
866 def __eq__(self, other):
869 if not isinstance(other, ArvadosFile):
872 othersegs = other.segments()
874 if len(self._segments) != len(othersegs):
876 for i in xrange(0, len(othersegs)):
877 seg1 = self._segments[i]
882 if self.parent._my_block_manager().is_bufferblock(loc1):
883 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
885 if other.parent._my_block_manager().is_bufferblock(loc2):
886 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
888 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
889 seg1.range_start != seg2.range_start or
890 seg1.range_size != seg2.range_size or
891 seg1.segment_offset != seg2.segment_offset):
896 def __ne__(self, other):
897 return not self.__eq__(other)
900 def set_segments(self, segs):
901 self._segments = segs
904 def set_committed(self, value=True):
905 """Set committed flag.
907 If value is True, set committed to be True.
909 If value is False, set committed to be False for this and all parents.
911 if value == self._committed:
913 self._committed = value
914 if self._committed is False and self.parent is not None:
915 self.parent.set_committed(False)
919 """Get whether this is committed or not."""
920 return self._committed
923 def add_writer(self, writer):
924 """Add an ArvadosFileWriter reference to the list of writers"""
925 if isinstance(writer, ArvadosFileWriter):
926 self._writers.add(writer)
929 def remove_writer(self, writer, flush):
931 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
932 and do some block maintenance tasks.
934 self._writers.remove(writer)
936 if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
937 # File writer closed, not small enough for repacking
940 # All writers closed and size is adequate for repacking
941 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
945 Get whether this is closed or not. When the writers list is empty, the file
946 is supposed to be closed.
948 return len(self._writers) == 0
952 def truncate(self, size):
953 """Shrink or expand the size of the file.
955 If `size` is less than the size of the file, the file contents after
956 `size` will be discarded. If `size` is greater than the current size
957 of the file, it will be filled with zero bytes.
960 if size < self.size():
962 for r in self._segments:
963 range_end = r.range_start+r.range_size
964 if r.range_start >= size:
965 # segment is past the trucate size, all done
967 elif size < range_end:
968 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
969 nr.segment_offset = r.segment_offset
975 self._segments = new_segs
976 self.set_committed(False)
977 elif size > self.size():
978 padding = self.parent._my_block_manager().get_padding_block()
979 diff = size - self.size()
980 while diff > config.KEEP_BLOCK_SIZE:
981 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
982 diff -= config.KEEP_BLOCK_SIZE
984 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
985 self.set_committed(False)
987 # size == self.size()
990 def readfrom(self, offset, size, num_retries, exact=False):
991 """Read up to `size` bytes from the file starting at `offset`.
994 If False (default), return less data than requested if the read
995 crosses a block boundary and the next block isn't cached. If True,
996 only return less data than requested when hitting EOF.
1000 if size == 0 or offset >= self.size():
1002 readsegs = locators_and_ranges(self._segments, offset, size)
1003 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
1008 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1010 blockview = memoryview(block)
1011 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1012 locs.add(lr.locator)
1017 if lr.locator not in locs:
1018 self.parent._my_block_manager().block_prefetch(lr.locator)
1019 locs.add(lr.locator)
1021 return ''.join(data)
1023 def _repack_writes(self, num_retries):
1024 """Optimize buffer block by repacking segments in file sequence.
1026 When the client makes random writes, they appear in the buffer block in
1027 the sequence they were written rather than the sequence they appear in
1028 the file. This makes for inefficient, fragmented manifests. Attempt
1029 to optimize by repacking writes in file sequence.
1032 segs = self._segments
1034 # Collect the segments that reference the buffer block.
1035 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
1037 # Collect total data referenced by segments (could be smaller than
1038 # bufferblock size if a portion of the file was written and
1039 # then overwritten).
1040 write_total = sum([s.range_size for s in bufferblock_segs])
1042 if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
1043 # If there's more than one segment referencing this block, it is
1044 # due to out-of-order writes and will produce a fragmented
1045 # manifest, so try to optimize by re-packing into a new buffer.
1046 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
1047 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
1048 for t in bufferblock_segs:
1049 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
1050 t.segment_offset = new_bb.size() - t.range_size
1051 self._current_bblock.clear()
1052 self._current_bblock = new_bb
1056 def writeto(self, offset, data, num_retries):
1057 """Write `data` to the file starting at `offset`.
1059 This will update existing bytes and/or extend the size of the file as
1066 if offset > self.size():
1067 self.truncate(offset)
1069 if len(data) > config.KEEP_BLOCK_SIZE:
1070 # Chunk it up into smaller writes
1072 dataview = memoryview(data)
1073 while n < len(data):
1074 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1075 n += config.KEEP_BLOCK_SIZE
1078 self.set_committed(False)
1080 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1081 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1083 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1084 self._repack_writes(num_retries)
1085 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1086 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1087 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1089 self._current_bblock.append(data)
1091 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1093 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1098 def flush(self, sync=True, num_retries=0):
1099 """Flush the current bufferblock to Keep.
1102 If True, commit block synchronously, wait until buffer block has been written.
1103 If False, commit block asynchronously, return immediately after putting block into
1106 if self.committed():
1109 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1110 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1111 self._repack_writes(num_retries)
1112 if self._current_bblock.state() != _BufferBlock.DELETED:
1113 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1117 for s in self._segments:
1118 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1120 if bb.state() != _BufferBlock.COMMITTED:
1121 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1122 to_delete.add(s.locator)
1123 s.locator = bb.locator()
1125 self.parent._my_block_manager().delete_bufferblock(s)
1127 self.parent.notify(MOD, self.parent, self.name, (self, self))
1131 def add_segment(self, blocks, pos, size):
1132 """Add a segment to the end of the file.
1134 `pos` and `offset` reference a section of the stream described by
1135 `blocks` (a list of Range objects)
1138 self._add_segment(blocks, pos, size)
1140 def _add_segment(self, blocks, pos, size):
1141 """Internal implementation of add_segment."""
1142 self.set_committed(False)
1143 for lr in locators_and_ranges(blocks, pos, size):
1144 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1145 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1146 self._segments.append(r)
1150 """Get the file size."""
1152 n = self._segments[-1]
1153 return n.range_start + n.range_size
1158 def manifest_text(self, stream_name=".", portable_locators=False,
1159 normalize=False, only_committed=False):
1162 for segment in self.segments():
1163 loc = segment.locator
1164 if self.parent._my_block_manager().is_bufferblock(loc):
1167 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1168 if portable_locators:
1169 loc = KeepLocator(loc).stripped()
1170 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1171 segment.segment_offset, segment.range_size))
1172 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1178 def _reparent(self, newparent, newname):
1179 self.set_committed(False)
1180 self.flush(sync=True)
1181 self.parent.remove(self.name)
1182 self.parent = newparent
1184 self.lock = self.parent.root_collection().lock
1187 class ArvadosFileReader(ArvadosFileReaderBase):
1188 """Wraps ArvadosFile in a file-like object supporting reading only.
1190 Be aware that this class is NOT thread safe as there is no locking around
1191 updating file pointer.
1195 def __init__(self, arvadosfile, num_retries=None):
1196 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1197 self.arvadosfile = arvadosfile
1200 return self.arvadosfile.size()
1202 def stream_name(self):
1203 return self.arvadosfile.parent.stream_name()
1205 @_FileLikeObjectBase._before_close
1207 def read(self, size=None, num_retries=None):
1208 """Read up to `size` bytes from the file and return the result.
1210 Starts at the current file position. If `size` is None, read the
1211 entire remainder of the file.
1215 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1218 self._filepos += len(rd)
1219 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1220 return ''.join(data)
1222 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1223 self._filepos += len(data)
1226 @_FileLikeObjectBase._before_close
1228 def readfrom(self, offset, size, num_retries=None):
1229 """Read up to `size` bytes from the stream, starting at the specified file offset.
1231 This method does not change the file position.
1233 return self.arvadosfile.readfrom(offset, size, num_retries)
1239 class ArvadosFileWriter(ArvadosFileReader):
1240 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1242 Be aware that this class is NOT thread safe as there is no locking around
1243 updating file pointer.
1247 def __init__(self, arvadosfile, mode, num_retries=None):
1248 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1250 self.arvadosfile.add_writer(self)
1255 @_FileLikeObjectBase._before_close
1257 def write(self, data, num_retries=None):
1258 if self.mode[0] == "a":
1259 self.arvadosfile.writeto(self.size(), data, num_retries)
1261 self.arvadosfile.writeto(self._filepos, data, num_retries)
1262 self._filepos += len(data)
1265 @_FileLikeObjectBase._before_close
1267 def writelines(self, seq, num_retries=None):
1269 self.write(s, num_retries=num_retries)
1271 @_FileLikeObjectBase._before_close
1272 def truncate(self, size=None):
1274 size = self._filepos
1275 self.arvadosfile.truncate(size)
1277 @_FileLikeObjectBase._before_close
1279 self.arvadosfile.flush()
1281 def close(self, flush=True):
1283 self.arvadosfile.remove_writer(self, flush)
1284 super(ArvadosFileWriter, self).close()