16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
20 from .retry import retry_method
25 _logger = logging.getLogger('arvados.arvfile')
28 """split(path) -> streamname, filename
30 Separate the stream name and file name in a /-separated stream path and
31 return a tuple (stream_name, file_name). If no stream name is available,
36 stream_name, file_name = path.rsplit('/', 1)
37 except ValueError: # No / in string
38 stream_name, file_name = '.', path
39 return stream_name, file_name
42 class UnownedBlockError(Exception):
43 """Raised when there's an writable block without an owner on the BlockManager."""
47 class _FileLikeObjectBase(object):
48 def __init__(self, name, mode):
54 def _before_close(orig_func):
55 @functools.wraps(orig_func)
56 def before_close_wrapper(self, *args, **kwargs):
58 raise ValueError("I/O operation on closed stream file")
59 return orig_func(self, *args, **kwargs)
60 return before_close_wrapper
65 def __exit__(self, exc_type, exc_value, traceback):
76 class ArvadosFileReaderBase(_FileLikeObjectBase):
77 def __init__(self, name, mode, num_retries=None):
78 super(ArvadosFileReaderBase, self).__init__(name, mode)
80 self.num_retries = num_retries
81 self._readline_cache = (None, None)
85 data = self.readline()
90 def decompressed_name(self):
91 return re.sub('\.(bz2|gz)$', '', self.name)
93 @_FileLikeObjectBase._before_close
94 def seek(self, pos, whence=os.SEEK_SET):
95 if whence == os.SEEK_CUR:
97 elif whence == os.SEEK_END:
100 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
116 @_FileLikeObjectBase._before_close
118 def readall(self, size=2**20, num_retries=None):
120 data = self.read(size, num_retries=num_retries)
125 @_FileLikeObjectBase._before_close
127 def readline(self, size=float('inf'), num_retries=None):
128 cache_pos, cache_data = self._readline_cache
129 if self.tell() == cache_pos:
131 self._filepos += len(cache_data)
134 data_size = len(data[-1])
135 while (data_size < size) and ('\n' not in data[-1]):
136 next_read = self.read(2 ** 20, num_retries=num_retries)
139 data.append(next_read)
140 data_size += len(next_read)
143 nextline_index = data.index('\n') + 1
145 nextline_index = len(data)
146 nextline_index = min(nextline_index, size)
147 self._filepos -= len(data) - nextline_index
148 self._readline_cache = (self.tell(), data[nextline_index:])
149 return data[:nextline_index]
151 @_FileLikeObjectBase._before_close
153 def decompress(self, decompress, size, num_retries=None):
154 for segment in self.readall(size, num_retries=num_retries):
155 data = decompress(segment)
159 @_FileLikeObjectBase._before_close
161 def readall_decompressed(self, size=2**20, num_retries=None):
163 if self.name.endswith('.bz2'):
164 dc = bz2.BZ2Decompressor()
165 return self.decompress(dc.decompress, size,
166 num_retries=num_retries)
167 elif self.name.endswith('.gz'):
168 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
169 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
170 size, num_retries=num_retries)
172 return self.readall(size, num_retries=num_retries)
174 @_FileLikeObjectBase._before_close
176 def readlines(self, sizehint=float('inf'), num_retries=None):
179 for s in self.readall(num_retries=num_retries):
182 if data_size >= sizehint:
184 return ''.join(data).splitlines(True)
187 raise IOError(errno.ENOSYS, "Not implemented")
189 def read(self, size, num_retries=None):
190 raise IOError(errno.ENOSYS, "Not implemented")
192 def readfrom(self, start, size, num_retries=None):
193 raise IOError(errno.ENOSYS, "Not implemented")
196 class StreamFileReader(ArvadosFileReaderBase):
197 class _NameAttribute(str):
198 # The Python file API provides a plain .name attribute.
199 # Older SDK provided a name() method.
200 # This class provides both, for maximum compatibility.
204 def __init__(self, stream, segments, name):
205 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
206 self._stream = stream
207 self.segments = segments
209 def stream_name(self):
210 return self._stream.name()
213 n = self.segments[-1]
214 return n.range_start + n.range_size
216 @_FileLikeObjectBase._before_close
218 def read(self, size, num_retries=None):
219 """Read up to 'size' bytes from the stream, starting at the current file position"""
224 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
226 lr = available_chunks[0]
227 data = self._stream.readfrom(lr.locator+lr.segment_offset,
229 num_retries=num_retries)
231 self._filepos += len(data)
234 @_FileLikeObjectBase._before_close
236 def readfrom(self, start, size, num_retries=None):
237 """Read up to 'size' bytes from the stream, starting at 'start'"""
242 for lr in locators_and_ranges(self.segments, start, size):
243 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
244 num_retries=num_retries))
247 def as_manifest(self):
249 for r in self.segments:
250 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
251 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
254 def synchronized(orig_func):
255 @functools.wraps(orig_func)
256 def synchronized_wrapper(self, *args, **kwargs):
258 return orig_func(self, *args, **kwargs)
259 return synchronized_wrapper
262 class StateChangeError(Exception):
263 def __init__(self, message, state, nextstate):
264 super(StateChangeError, self).__init__(message)
266 self.nextstate = nextstate
268 class _BufferBlock(object):
269 """A stand-in for a Keep block that is in the process of being written.
271 Writers can append to it, get the size, and compute the Keep locator.
272 There are three valid states:
278 Block is in the process of being uploaded to Keep, append is an error.
281 The block has been written to Keep, its internal buffer has been
282 released, fetching the block will fetch it via keep client (since we
283 discarded the internal copy), and identifiers referring to the BufferBlock
284 can be replaced with the block locator.
294 def __init__(self, blockid, starting_capacity, owner):
297 the identifier for this block
300 the initial buffer capacity
303 ArvadosFile that owns this block
306 self.blockid = blockid
307 self.buffer_block = bytearray(starting_capacity)
308 self.buffer_view = memoryview(self.buffer_block)
309 self.write_pointer = 0
310 self._state = _BufferBlock.WRITABLE
313 self.lock = threading.Lock()
314 self.wait_for_commit = threading.Event()
318 def append(self, data):
319 """Append some data to the buffer.
321 Only valid if the block is in WRITABLE state. Implements an expanding
322 buffer, doubling capacity as needed to accomdate all the data.
325 if self._state == _BufferBlock.WRITABLE:
326 while (self.write_pointer+len(data)) > len(self.buffer_block):
327 new_buffer_block = bytearray(len(self.buffer_block) * 2)
328 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
329 self.buffer_block = new_buffer_block
330 self.buffer_view = memoryview(self.buffer_block)
331 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
332 self.write_pointer += len(data)
335 raise AssertionError("Buffer block is not writable")
337 STATE_TRANSITIONS = frozenset([
339 (PENDING, COMMITTED),
344 def set_state(self, nextstate, val=None):
345 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
346 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
347 self._state = nextstate
349 if self._state == _BufferBlock.PENDING:
350 self.wait_for_commit.clear()
352 if self._state == _BufferBlock.COMMITTED:
354 self.buffer_view = None
355 self.buffer_block = None
356 self.wait_for_commit.set()
358 if self._state == _BufferBlock.ERROR:
360 self.wait_for_commit.set()
367 """The amount of data written to the buffer."""
368 return self.write_pointer
372 """The Keep locator for this buffer's contents."""
373 if self._locator is None:
374 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
378 def clone(self, new_blockid, owner):
379 if self._state == _BufferBlock.COMMITTED:
380 raise AssertionError("Cannot duplicate committed buffer block")
381 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
382 bufferblock.append(self.buffer_view[0:self.size()])
387 self._state = _BufferBlock.DELETED
389 self.buffer_block = None
390 self.buffer_view = None
393 return "<BufferBlock %s>" % (self.blockid)
396 class NoopLock(object):
400 def __exit__(self, exc_type, exc_value, traceback):
403 def acquire(self, blocking=False):
410 def must_be_writable(orig_func):
411 @functools.wraps(orig_func)
412 def must_be_writable_wrapper(self, *args, **kwargs):
413 if not self.writable():
414 raise IOError(errno.EROFS, "Collection is read-only.")
415 return orig_func(self, *args, **kwargs)
416 return must_be_writable_wrapper
419 class _BlockManager(object):
420 """BlockManager handles buffer blocks.
422 Also handles background block uploads, and background block prefetch for a
423 Collection of ArvadosFiles.
427 DEFAULT_PUT_THREADS = 2
428 DEFAULT_GET_THREADS = 2
430 def __init__(self, keep, copies=None, put_threads=None):
431 """keep: KeepClient object to use"""
433 self._bufferblocks = collections.OrderedDict()
434 self._put_queue = None
435 self._put_threads = None
436 self._prefetch_queue = None
437 self._prefetch_threads = None
438 self.lock = threading.Lock()
439 self.prefetch_enabled = True
441 self.num_put_threads = put_threads
443 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
444 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
446 self._pending_write_size = 0
447 self.threads_lock = threading.Lock()
448 self.padding_block = None
451 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
452 """Allocate a new, empty bufferblock in WRITABLE state and return it.
455 optional block identifier, otherwise one will be automatically assigned
458 optional capacity, otherwise will use default capacity
461 ArvadosFile that owns this block
464 return self._alloc_bufferblock(blockid, starting_capacity, owner)
466 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
468 blockid = str(uuid.uuid4())
469 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
470 self._bufferblocks[bufferblock.blockid] = bufferblock
474 def dup_block(self, block, owner):
475 """Create a new bufferblock initialized with the content of an existing bufferblock.
478 the buffer block to copy.
481 ArvadosFile that owns the new block
484 new_blockid = str(uuid.uuid4())
485 bufferblock = block.clone(new_blockid, owner)
486 self._bufferblocks[bufferblock.blockid] = bufferblock
490 def is_bufferblock(self, locator):
491 return locator in self._bufferblocks
493 def _commit_bufferblock_worker(self):
494 """Background uploader thread."""
498 bufferblock = self._put_queue.get()
499 if bufferblock is None:
502 if self.copies is None:
503 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
505 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
506 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
508 except Exception as e:
509 bufferblock.set_state(_BufferBlock.ERROR, e)
511 if self._put_queue is not None:
512 self._put_queue.task_done()
514 def start_put_threads(self):
515 with self.threads_lock:
516 if self._put_threads is None:
517 # Start uploader threads.
519 # If we don't limit the Queue size, the upload queue can quickly
520 # grow to take up gigabytes of RAM if the writing process is
521 # generating data more quickly than it can be send to the Keep
524 # With two upload threads and a queue size of 2, this means up to 4
525 # blocks pending. If they are full 64 MiB blocks, that means up to
526 # 256 MiB of internal buffering, which is the same size as the
527 # default download block cache in KeepClient.
528 self._put_queue = Queue.Queue(maxsize=2)
530 self._put_threads = []
531 for i in xrange(0, self.num_put_threads):
532 thread = threading.Thread(target=self._commit_bufferblock_worker)
533 self._put_threads.append(thread)
537 def _block_prefetch_worker(self):
538 """The background downloader thread."""
541 b = self._prefetch_queue.get()
546 _logger.exception("Exception doing block prefetch")
549 def start_get_threads(self):
550 if self._prefetch_threads is None:
551 self._prefetch_queue = Queue.Queue()
552 self._prefetch_threads = []
553 for i in xrange(0, self.num_get_threads):
554 thread = threading.Thread(target=self._block_prefetch_worker)
555 self._prefetch_threads.append(thread)
561 def stop_threads(self):
562 """Shut down and wait for background upload and download threads to finish."""
564 if self._put_threads is not None:
565 for t in self._put_threads:
566 self._put_queue.put(None)
567 for t in self._put_threads:
569 self._put_threads = None
570 self._put_queue = None
572 if self._prefetch_threads is not None:
573 for t in self._prefetch_threads:
574 self._prefetch_queue.put(None)
575 for t in self._prefetch_threads:
577 self._prefetch_threads = None
578 self._prefetch_queue = None
583 def __exit__(self, exc_type, exc_value, traceback):
586 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
587 """Packs small blocks together before uploading"""
590 self._pending_write_size += closed_file_size
592 # Check if there are enough small blocks for filling up one in full
593 if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
596 # Search blocks ready for getting packed together before being committed to Keep.
597 # A WRITABLE block always has an owner.
598 # A WRITABLE block with its owner.closed() implies that it's
599 # size is <= KEEP_BLOCK_SIZE/2.
600 bufferblocks = self._bufferblocks.values()
603 for b in bufferblocks:
604 if b.state() == _BufferBlock.WRITABLE and b.owner.closed():
605 b.owner._repack_writes(0)
606 except AttributeError:
607 # Writable blocks without owner shouldn't exist.
608 raise UnownedBlockError()
611 small_blocks = [b for b in self._bufferblocks.values()
612 if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
614 if len(small_blocks) <= 1:
615 # Not enough small blocks for repacking
618 # Update the pending write size count with its true value, just in case
619 # some small file was opened, written and closed several times.
620 self._pending_write_size = sum([b.size() for b in small_blocks])
621 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
624 new_bb = self._alloc_bufferblock()
626 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
627 bb = small_blocks.pop(0)
628 self._pending_write_size -= bb.size()
629 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
630 files.append((bb, new_bb.write_pointer - bb.size()))
632 self.commit_bufferblock(new_bb, sync=sync)
634 for bb, new_bb_segment_offset in files:
636 for s in bb.owner.segments():
637 if s.locator == bb.blockid:
638 newsegs.append(Range(new_bb.locator(), s.range_start, s.range_size, new_bb_segment_offset+s.segment_offset))
641 bb.owner.set_segments(newsegs)
642 self._delete_bufferblock(bb.blockid)
644 def commit_bufferblock(self, block, sync):
645 """Initiate a background upload of a bufferblock.
648 The block object to upload
651 If `sync` is True, upload the block synchronously.
652 If `sync` is False, upload the block asynchronously. This will
653 return immediately unless the upload queue is at capacity, in
654 which case it will wait on an upload queue slot.
658 # Mark the block as PENDING so to disallow any more appends.
659 block.set_state(_BufferBlock.PENDING)
660 except StateChangeError as e:
661 if e.state == _BufferBlock.PENDING:
663 block.wait_for_commit.wait()
666 if block.state() == _BufferBlock.COMMITTED:
668 elif block.state() == _BufferBlock.ERROR:
675 if self.copies is None:
676 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
678 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
679 block.set_state(_BufferBlock.COMMITTED, loc)
680 except Exception as e:
681 block.set_state(_BufferBlock.ERROR, e)
684 self.start_put_threads()
685 self._put_queue.put(block)
688 def get_bufferblock(self, locator):
689 return self._bufferblocks.get(locator)
692 def get_padding_block(self):
693 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
694 when using truncate() to extend the size of a file.
696 For reference (and possible future optimization), the md5sum of the
697 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
701 if self.padding_block is None:
702 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
703 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
704 self.commit_bufferblock(self.padding_block, False)
705 return self.padding_block
708 def delete_bufferblock(self, locator):
709 self._delete_bufferblock(locator)
711 def _delete_bufferblock(self, locator):
712 bb = self._bufferblocks[locator]
714 del self._bufferblocks[locator]
716 def get_block_contents(self, locator, num_retries, cache_only=False):
719 First checks to see if the locator is a BufferBlock and return that, if
720 not, passes the request through to KeepClient.get().
724 if locator in self._bufferblocks:
725 bufferblock = self._bufferblocks[locator]
726 if bufferblock.state() != _BufferBlock.COMMITTED:
727 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
729 locator = bufferblock._locator
731 return self._keep.get_from_cache(locator)
733 return self._keep.get(locator, num_retries=num_retries)
735 def commit_all(self):
736 """Commit all outstanding buffer blocks.
738 This is a synchronous call, and will not return until all buffer blocks
739 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
742 self.repack_small_blocks(force=True, sync=True)
745 items = self._bufferblocks.items()
748 if v.state() != _BufferBlock.COMMITTED and v.owner:
749 v.owner.flush(sync=False)
752 if self._put_queue is not None:
753 self._put_queue.join()
757 if v.state() == _BufferBlock.ERROR:
758 err.append((v.locator(), v.error))
760 raise KeepWriteError("Error writing some blocks", err, label="block")
763 # flush again with sync=True to remove committed bufferblocks from
766 v.owner.flush(sync=True)
768 def block_prefetch(self, locator):
769 """Initiate a background download of a block.
771 This assumes that the underlying KeepClient implements a block cache,
772 so repeated requests for the same block will not result in repeated
773 downloads (unless the block is evicted from the cache.) This method
778 if not self.prefetch_enabled:
781 if self._keep.get_from_cache(locator) is not None:
785 if locator in self._bufferblocks:
788 self.start_get_threads()
789 self._prefetch_queue.put(locator)
792 class ArvadosFile(object):
793 """Represent a file in a Collection.
795 ArvadosFile manages the underlying representation of a file in Keep as a
796 sequence of segments spanning a set of blocks, and implements random
799 This object may be accessed from multiple threads.
803 def __init__(self, parent, name, stream=[], segments=[]):
805 ArvadosFile constructor.
808 a list of Range objects representing a block stream
811 a list of Range objects representing segments
815 self._writers = set()
816 self._committed = False
818 self.lock = parent.root_collection().lock
820 self._add_segment(stream, s.locator, s.range_size)
821 self._current_bblock = None
824 return self.parent.writable()
827 def permission_expired(self, as_of_dt=None):
828 """Returns True if any of the segment's locators is expired"""
829 for r in self._segments:
830 if KeepLocator(r.locator).permission_expired(as_of_dt):
836 return copy.copy(self._segments)
839 def clone(self, new_parent, new_name):
840 """Make a copy of this file."""
841 cp = ArvadosFile(new_parent, new_name)
842 cp.replace_contents(self)
847 def replace_contents(self, other):
848 """Replace segments of this file with segments from another `ArvadosFile` object."""
852 for other_segment in other.segments():
853 new_loc = other_segment.locator
854 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
855 if other_segment.locator not in map_loc:
856 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
857 if bufferblock.state() != _BufferBlock.WRITABLE:
858 map_loc[other_segment.locator] = bufferblock.locator()
860 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
861 new_loc = map_loc[other_segment.locator]
863 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
865 self.set_committed(False)
867 def __eq__(self, other):
870 if not isinstance(other, ArvadosFile):
873 othersegs = other.segments()
875 if len(self._segments) != len(othersegs):
877 for i in xrange(0, len(othersegs)):
878 seg1 = self._segments[i]
883 if self.parent._my_block_manager().is_bufferblock(loc1):
884 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
886 if other.parent._my_block_manager().is_bufferblock(loc2):
887 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
889 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
890 seg1.range_start != seg2.range_start or
891 seg1.range_size != seg2.range_size or
892 seg1.segment_offset != seg2.segment_offset):
897 def __ne__(self, other):
898 return not self.__eq__(other)
901 def set_segments(self, segs):
902 self._segments = segs
905 def set_committed(self, value=True):
906 """Set committed flag.
908 If value is True, set committed to be True.
910 If value is False, set committed to be False for this and all parents.
912 if value == self._committed:
914 self._committed = value
915 if self._committed is False and self.parent is not None:
916 self.parent.set_committed(False)
920 """Get whether this is committed or not."""
921 return self._committed
924 def add_writer(self, writer):
925 """Add an ArvadosFileWriter reference to the list of writers"""
926 if isinstance(writer, ArvadosFileWriter):
927 self._writers.add(writer)
930 def remove_writer(self, writer, flush):
932 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
933 and do some block maintenance tasks.
935 self._writers.remove(writer)
937 if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
938 # File writer closed, not small enough for repacking
941 # All writers closed and size is adequate for repacking
942 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
946 Get whether this is closed or not. When the writers list is empty, the file
947 is supposed to be closed.
949 return len(self._writers) == 0
953 def truncate(self, size):
954 """Shrink or expand the size of the file.
956 If `size` is less than the size of the file, the file contents after
957 `size` will be discarded. If `size` is greater than the current size
958 of the file, it will be filled with zero bytes.
961 if size < self.size():
963 for r in self._segments:
964 range_end = r.range_start+r.range_size
965 if r.range_start >= size:
966 # segment is past the trucate size, all done
968 elif size < range_end:
969 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
970 nr.segment_offset = r.segment_offset
976 self._segments = new_segs
977 self.set_committed(False)
978 elif size > self.size():
979 padding = self.parent._my_block_manager().get_padding_block()
980 diff = size - self.size()
981 while diff > config.KEEP_BLOCK_SIZE:
982 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
983 diff -= config.KEEP_BLOCK_SIZE
985 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
986 self.set_committed(False)
988 # size == self.size()
991 def readfrom(self, offset, size, num_retries, exact=False):
992 """Read up to `size` bytes from the file starting at `offset`.
995 If False (default), return less data than requested if the read
996 crosses a block boundary and the next block isn't cached. If True,
997 only return less data than requested when hitting EOF.
1001 if size == 0 or offset >= self.size():
1003 readsegs = locators_and_ranges(self._segments, offset, size)
1004 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
1009 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1011 blockview = memoryview(block)
1012 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1013 locs.add(lr.locator)
1018 if lr.locator not in locs:
1019 self.parent._my_block_manager().block_prefetch(lr.locator)
1020 locs.add(lr.locator)
1022 return ''.join(data)
1024 def _repack_writes(self, num_retries):
1025 """Optimize buffer block by repacking segments in file sequence.
1027 When the client makes random writes, they appear in the buffer block in
1028 the sequence they were written rather than the sequence they appear in
1029 the file. This makes for inefficient, fragmented manifests. Attempt
1030 to optimize by repacking writes in file sequence.
1033 segs = self._segments
1035 # Collect the segments that reference the buffer block.
1036 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
1038 # Collect total data referenced by segments (could be smaller than
1039 # bufferblock size if a portion of the file was written and
1040 # then overwritten).
1041 write_total = sum([s.range_size for s in bufferblock_segs])
1043 if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
1044 # If there's more than one segment referencing this block, it is
1045 # due to out-of-order writes and will produce a fragmented
1046 # manifest, so try to optimize by re-packing into a new buffer.
1047 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
1048 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
1049 for t in bufferblock_segs:
1050 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
1051 t.segment_offset = new_bb.size() - t.range_size
1052 self._current_bblock.clear()
1053 self._current_bblock = new_bb
1057 def writeto(self, offset, data, num_retries):
1058 """Write `data` to the file starting at `offset`.
1060 This will update existing bytes and/or extend the size of the file as
1067 if offset > self.size():
1068 self.truncate(offset)
1070 if len(data) > config.KEEP_BLOCK_SIZE:
1071 # Chunk it up into smaller writes
1073 dataview = memoryview(data)
1074 while n < len(data):
1075 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1076 n += config.KEEP_BLOCK_SIZE
1079 self.set_committed(False)
1081 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1082 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1084 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1085 self._repack_writes(num_retries)
1086 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1087 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1088 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1090 self._current_bblock.append(data)
1092 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1094 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1099 def flush(self, sync=True, num_retries=0):
1100 """Flush the current bufferblock to Keep.
1103 If True, commit block synchronously, wait until buffer block has been written.
1104 If False, commit block asynchronously, return immediately after putting block into
1107 if self.committed():
1110 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1111 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1112 self._repack_writes(num_retries)
1113 if self._current_bblock.state() != _BufferBlock.DELETED:
1114 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1118 for s in self._segments:
1119 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1121 if bb.state() != _BufferBlock.COMMITTED:
1122 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1123 to_delete.add(s.locator)
1124 s.locator = bb.locator()
1126 self.parent._my_block_manager().delete_bufferblock(s)
1128 self.parent.notify(MOD, self.parent, self.name, (self, self))
1132 def add_segment(self, blocks, pos, size):
1133 """Add a segment to the end of the file.
1135 `pos` and `offset` reference a section of the stream described by
1136 `blocks` (a list of Range objects)
1139 self._add_segment(blocks, pos, size)
1141 def _add_segment(self, blocks, pos, size):
1142 """Internal implementation of add_segment."""
1143 self.set_committed(False)
1144 for lr in locators_and_ranges(blocks, pos, size):
1145 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1146 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1147 self._segments.append(r)
1151 """Get the file size."""
1153 n = self._segments[-1]
1154 return n.range_start + n.range_size
1159 def manifest_text(self, stream_name=".", portable_locators=False,
1160 normalize=False, only_committed=False):
1163 for segment in self.segments():
1164 loc = segment.locator
1165 if self.parent._my_block_manager().is_bufferblock(loc):
1168 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1169 if portable_locators:
1170 loc = KeepLocator(loc).stripped()
1171 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1172 segment.segment_offset, segment.range_size))
1173 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1179 def _reparent(self, newparent, newname):
1180 self.set_committed(False)
1181 self.flush(sync=True)
1182 self.parent.remove(self.name)
1183 self.parent = newparent
1185 self.lock = self.parent.root_collection().lock
1188 class ArvadosFileReader(ArvadosFileReaderBase):
1189 """Wraps ArvadosFile in a file-like object supporting reading only.
1191 Be aware that this class is NOT thread safe as there is no locking around
1192 updating file pointer.
1196 def __init__(self, arvadosfile, num_retries=None):
1197 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1198 self.arvadosfile = arvadosfile
1201 return self.arvadosfile.size()
1203 def stream_name(self):
1204 return self.arvadosfile.parent.stream_name()
1206 @_FileLikeObjectBase._before_close
1208 def read(self, size=None, num_retries=None):
1209 """Read up to `size` bytes from the file and return the result.
1211 Starts at the current file position. If `size` is None, read the
1212 entire remainder of the file.
1216 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1219 self._filepos += len(rd)
1220 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1221 return ''.join(data)
1223 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1224 self._filepos += len(data)
1227 @_FileLikeObjectBase._before_close
1229 def readfrom(self, offset, size, num_retries=None):
1230 """Read up to `size` bytes from the stream, starting at the specified file offset.
1232 This method does not change the file position.
1234 return self.arvadosfile.readfrom(offset, size, num_retries)
1240 class ArvadosFileWriter(ArvadosFileReader):
1241 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1243 Be aware that this class is NOT thread safe as there is no locking around
1244 updating file pointer.
1248 def __init__(self, arvadosfile, mode, num_retries=None):
1249 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1251 self.arvadosfile.add_writer(self)
1256 @_FileLikeObjectBase._before_close
1258 def write(self, data, num_retries=None):
1259 if self.mode[0] == "a":
1260 self.arvadosfile.writeto(self.size(), data, num_retries)
1262 self.arvadosfile.writeto(self._filepos, data, num_retries)
1263 self._filepos += len(data)
1266 @_FileLikeObjectBase._before_close
1268 def writelines(self, seq, num_retries=None):
1270 self.write(s, num_retries=num_retries)
1272 @_FileLikeObjectBase._before_close
1273 def truncate(self, size=None):
1275 size = self._filepos
1276 self.arvadosfile.truncate(size)
1278 @_FileLikeObjectBase._before_close
1280 self.arvadosfile.flush()
1282 def close(self, flush=True):
1284 self.arvadosfile.remove_writer(self, flush)
1285 super(ArvadosFileWriter, self).close()