1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import range
6 from builtins import object
23 from .errors import KeepWriteError, AssertionError, ArgumentError
24 from .keep import KeepLocator
25 from ._normalize_stream import normalize_stream
26 from ._ranges import locators_and_ranges, replace_range, Range
27 from .retry import retry_method
32 _logger = logging.getLogger('arvados.arvfile')
35 """split(path) -> streamname, filename
37 Separate the stream name and file name in a /-separated stream path and
38 return a tuple (stream_name, file_name). If no stream name is available,
43 stream_name, file_name = path.rsplit('/', 1)
44 except ValueError: # No / in string
45 stream_name, file_name = '.', path
46 return stream_name, file_name
49 class UnownedBlockError(Exception):
50 """Raised when there's an writable block without an owner on the BlockManager."""
54 class _FileLikeObjectBase(object):
55 def __init__(self, name, mode):
61 def _before_close(orig_func):
62 @functools.wraps(orig_func)
63 def before_close_wrapper(self, *args, **kwargs):
65 raise ValueError("I/O operation on closed stream file")
66 return orig_func(self, *args, **kwargs)
67 return before_close_wrapper
72 def __exit__(self, exc_type, exc_value, traceback):
83 class ArvadosFileReaderBase(_FileLikeObjectBase):
84 def __init__(self, name, mode, num_retries=None):
85 super(ArvadosFileReaderBase, self).__init__(name, mode)
86 self._binary = 'b' in mode
87 if sys.version_info >= (3, 0) and not self._binary:
88 raise NotImplementedError("text mode {!r} is not implemented".format(mode))
90 self.num_retries = num_retries
91 self._readline_cache = (None, None)
95 data = self.readline()
100 def decompressed_name(self):
101 return re.sub('\.(bz2|gz)$', '', self.name)
103 @_FileLikeObjectBase._before_close
104 def seek(self, pos, whence=os.SEEK_SET):
105 if whence == os.SEEK_CUR:
107 elif whence == os.SEEK_END:
109 self._filepos = min(max(pos, 0), self.size())
114 @_FileLikeObjectBase._before_close
116 def readall(self, size=2**20, num_retries=None):
118 data = self.read(size, num_retries=num_retries)
123 @_FileLikeObjectBase._before_close
125 def readline(self, size=float('inf'), num_retries=None):
126 cache_pos, cache_data = self._readline_cache
127 if self.tell() == cache_pos:
129 self._filepos += len(cache_data)
132 data_size = len(data[-1])
133 while (data_size < size) and (b'\n' not in data[-1]):
134 next_read = self.read(2 ** 20, num_retries=num_retries)
137 data.append(next_read)
138 data_size += len(next_read)
139 data = b''.join(data)
141 nextline_index = data.index(b'\n') + 1
143 nextline_index = len(data)
144 nextline_index = min(nextline_index, size)
145 self._filepos -= len(data) - nextline_index
146 self._readline_cache = (self.tell(), data[nextline_index:])
147 return data[:nextline_index].decode()
149 @_FileLikeObjectBase._before_close
151 def decompress(self, decompress, size, num_retries=None):
152 for segment in self.readall(size, num_retries=num_retries):
153 data = decompress(segment)
157 @_FileLikeObjectBase._before_close
159 def readall_decompressed(self, size=2**20, num_retries=None):
161 if self.name.endswith('.bz2'):
162 dc = bz2.BZ2Decompressor()
163 return self.decompress(dc.decompress, size,
164 num_retries=num_retries)
165 elif self.name.endswith('.gz'):
166 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
167 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
168 size, num_retries=num_retries)
170 return self.readall(size, num_retries=num_retries)
172 @_FileLikeObjectBase._before_close
174 def readlines(self, sizehint=float('inf'), num_retries=None):
177 for s in self.readall(num_retries=num_retries):
180 if data_size >= sizehint:
182 return b''.join(data).decode().splitlines(True)
185 raise NotImplementedError()
187 def read(self, size, num_retries=None):
188 raise NotImplementedError()
190 def readfrom(self, start, size, num_retries=None):
191 raise NotImplementedError()
194 class StreamFileReader(ArvadosFileReaderBase):
195 class _NameAttribute(str):
196 # The Python file API provides a plain .name attribute.
197 # Older SDK provided a name() method.
198 # This class provides both, for maximum compatibility.
202 def __init__(self, stream, segments, name):
203 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
204 self._stream = stream
205 self.segments = segments
207 def stream_name(self):
208 return self._stream.name()
211 n = self.segments[-1]
212 return n.range_start + n.range_size
214 @_FileLikeObjectBase._before_close
216 def read(self, size, num_retries=None):
217 """Read up to 'size' bytes from the stream, starting at the current file position"""
222 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
224 lr = available_chunks[0]
225 data = self._stream.readfrom(lr.locator+lr.segment_offset,
227 num_retries=num_retries)
229 self._filepos += len(data)
232 @_FileLikeObjectBase._before_close
234 def readfrom(self, start, size, num_retries=None):
235 """Read up to 'size' bytes from the stream, starting at 'start'"""
240 for lr in locators_and_ranges(self.segments, start, size):
241 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
242 num_retries=num_retries))
243 return b''.join(data)
245 def as_manifest(self):
247 for r in self.segments:
248 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
249 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
252 def synchronized(orig_func):
253 @functools.wraps(orig_func)
254 def synchronized_wrapper(self, *args, **kwargs):
256 return orig_func(self, *args, **kwargs)
257 return synchronized_wrapper
260 class StateChangeError(Exception):
261 def __init__(self, message, state, nextstate):
262 super(StateChangeError, self).__init__(message)
264 self.nextstate = nextstate
266 class _BufferBlock(object):
267 """A stand-in for a Keep block that is in the process of being written.
269 Writers can append to it, get the size, and compute the Keep locator.
270 There are three valid states:
276 Block is in the process of being uploaded to Keep, append is an error.
279 The block has been written to Keep, its internal buffer has been
280 released, fetching the block will fetch it via keep client (since we
281 discarded the internal copy), and identifiers referring to the BufferBlock
282 can be replaced with the block locator.
291 def __init__(self, blockid, starting_capacity, owner):
294 the identifier for this block
297 the initial buffer capacity
300 ArvadosFile that owns this block
303 self.blockid = blockid
304 self.buffer_block = bytearray(starting_capacity)
305 self.buffer_view = memoryview(self.buffer_block)
306 self.write_pointer = 0
307 self._state = _BufferBlock.WRITABLE
310 self.lock = threading.Lock()
311 self.wait_for_commit = threading.Event()
315 def append(self, data):
316 """Append some data to the buffer.
318 Only valid if the block is in WRITABLE state. Implements an expanding
319 buffer, doubling capacity as needed to accomdate all the data.
322 if self._state == _BufferBlock.WRITABLE:
323 if not isinstance(data, bytes) and not isinstance(data, memoryview):
325 while (self.write_pointer+len(data)) > len(self.buffer_block):
326 new_buffer_block = bytearray(len(self.buffer_block) * 2)
327 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
328 self.buffer_block = new_buffer_block
329 self.buffer_view = memoryview(self.buffer_block)
330 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
331 self.write_pointer += len(data)
334 raise AssertionError("Buffer block is not writable")
336 STATE_TRANSITIONS = frozenset([
338 (PENDING, COMMITTED),
343 def set_state(self, nextstate, val=None):
344 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
345 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
346 self._state = nextstate
348 if self._state == _BufferBlock.PENDING:
349 self.wait_for_commit.clear()
351 if self._state == _BufferBlock.COMMITTED:
353 self.buffer_view = None
354 self.buffer_block = None
355 self.wait_for_commit.set()
357 if self._state == _BufferBlock.ERROR:
359 self.wait_for_commit.set()
366 """The amount of data written to the buffer."""
367 return self.write_pointer
371 """The Keep locator for this buffer's contents."""
372 if self._locator is None:
373 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
377 def clone(self, new_blockid, owner):
378 if self._state == _BufferBlock.COMMITTED:
379 raise AssertionError("Cannot duplicate committed buffer block")
380 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
381 bufferblock.append(self.buffer_view[0:self.size()])
387 self.buffer_block = None
388 self.buffer_view = None
391 class NoopLock(object):
395 def __exit__(self, exc_type, exc_value, traceback):
398 def acquire(self, blocking=False):
405 def must_be_writable(orig_func):
406 @functools.wraps(orig_func)
407 def must_be_writable_wrapper(self, *args, **kwargs):
408 if not self.writable():
409 raise IOError(errno.EROFS, "Collection is read-only.")
410 return orig_func(self, *args, **kwargs)
411 return must_be_writable_wrapper
414 class _BlockManager(object):
415 """BlockManager handles buffer blocks.
417 Also handles background block uploads, and background block prefetch for a
418 Collection of ArvadosFiles.
422 DEFAULT_PUT_THREADS = 2
423 DEFAULT_GET_THREADS = 2
425 def __init__(self, keep, copies=None, put_threads=None):
426 """keep: KeepClient object to use"""
428 self._bufferblocks = collections.OrderedDict()
429 self._put_queue = None
430 self._put_threads = None
431 self._prefetch_queue = None
432 self._prefetch_threads = None
433 self.lock = threading.Lock()
434 self.prefetch_enabled = True
436 self.num_put_threads = put_threads
438 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
439 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
441 self._pending_write_size = 0
442 self.threads_lock = threading.Lock()
445 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
446 """Allocate a new, empty bufferblock in WRITABLE state and return it.
449 optional block identifier, otherwise one will be automatically assigned
452 optional capacity, otherwise will use default capacity
455 ArvadosFile that owns this block
458 return self._alloc_bufferblock(blockid, starting_capacity, owner)
460 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
462 blockid = "%s" % uuid.uuid4()
463 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
464 self._bufferblocks[bufferblock.blockid] = bufferblock
468 def dup_block(self, block, owner):
469 """Create a new bufferblock initialized with the content of an existing bufferblock.
472 the buffer block to copy.
475 ArvadosFile that owns the new block
478 new_blockid = "bufferblock%i" % len(self._bufferblocks)
479 bufferblock = block.clone(new_blockid, owner)
480 self._bufferblocks[bufferblock.blockid] = bufferblock
484 def is_bufferblock(self, locator):
485 return locator in self._bufferblocks
487 def _commit_bufferblock_worker(self):
488 """Background uploader thread."""
492 bufferblock = self._put_queue.get()
493 if bufferblock is None:
496 if self.copies is None:
497 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
499 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
500 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
502 except Exception as e:
503 bufferblock.set_state(_BufferBlock.ERROR, e)
505 if self._put_queue is not None:
506 self._put_queue.task_done()
508 def start_put_threads(self):
509 with self.threads_lock:
510 if self._put_threads is None:
511 # Start uploader threads.
513 # If we don't limit the Queue size, the upload queue can quickly
514 # grow to take up gigabytes of RAM if the writing process is
515 # generating data more quickly than it can be send to the Keep
518 # With two upload threads and a queue size of 2, this means up to 4
519 # blocks pending. If they are full 64 MiB blocks, that means up to
520 # 256 MiB of internal buffering, which is the same size as the
521 # default download block cache in KeepClient.
522 self._put_queue = queue.Queue(maxsize=2)
524 self._put_threads = []
525 for i in range(0, self.num_put_threads):
526 thread = threading.Thread(target=self._commit_bufferblock_worker)
527 self._put_threads.append(thread)
531 def _block_prefetch_worker(self):
532 """The background downloader thread."""
535 b = self._prefetch_queue.get()
540 _logger.exception("Exception doing block prefetch")
543 def start_get_threads(self):
544 if self._prefetch_threads is None:
545 self._prefetch_queue = queue.Queue()
546 self._prefetch_threads = []
547 for i in range(0, self.num_get_threads):
548 thread = threading.Thread(target=self._block_prefetch_worker)
549 self._prefetch_threads.append(thread)
555 def stop_threads(self):
556 """Shut down and wait for background upload and download threads to finish."""
558 if self._put_threads is not None:
559 for t in self._put_threads:
560 self._put_queue.put(None)
561 for t in self._put_threads:
563 self._put_threads = None
564 self._put_queue = None
566 if self._prefetch_threads is not None:
567 for t in self._prefetch_threads:
568 self._prefetch_queue.put(None)
569 for t in self._prefetch_threads:
571 self._prefetch_threads = None
572 self._prefetch_queue = None
577 def __exit__(self, exc_type, exc_value, traceback):
581 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
582 """Packs small blocks together before uploading"""
583 self._pending_write_size += closed_file_size
585 # Check if there are enough small blocks for filling up one in full
586 if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
588 # Search blocks ready for getting packed together before being committed to Keep.
589 # A WRITABLE block always has an owner.
590 # A WRITABLE block with its owner.closed() implies that it's
591 # size is <= KEEP_BLOCK_SIZE/2.
593 small_blocks = [b for b in list(self._bufferblocks.values()) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
594 except AttributeError:
595 # Writable blocks without owner shouldn't exist.
596 raise UnownedBlockError()
598 if len(small_blocks) <= 1:
599 # Not enough small blocks for repacking
602 # Update the pending write size count with its true value, just in case
603 # some small file was opened, written and closed several times.
604 self._pending_write_size = sum([b.size() for b in small_blocks])
605 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
608 new_bb = self._alloc_bufferblock()
609 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
610 bb = small_blocks.pop(0)
612 self._pending_write_size -= bb.size()
613 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
614 arvfile.set_segments([Range(new_bb.blockid,
617 new_bb.write_pointer - bb.size())])
618 self._delete_bufferblock(bb.blockid)
619 self.commit_bufferblock(new_bb, sync=sync)
621 def commit_bufferblock(self, block, sync):
622 """Initiate a background upload of a bufferblock.
625 The block object to upload
628 If `sync` is True, upload the block synchronously.
629 If `sync` is False, upload the block asynchronously. This will
630 return immediately unless the upload queue is at capacity, in
631 which case it will wait on an upload queue slot.
635 # Mark the block as PENDING so to disallow any more appends.
636 block.set_state(_BufferBlock.PENDING)
637 except StateChangeError as e:
638 if e.state == _BufferBlock.PENDING:
640 block.wait_for_commit.wait()
643 if block.state() == _BufferBlock.COMMITTED:
645 elif block.state() == _BufferBlock.ERROR:
652 if self.copies is None:
653 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
655 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
656 block.set_state(_BufferBlock.COMMITTED, loc)
657 except Exception as e:
658 block.set_state(_BufferBlock.ERROR, e)
661 self.start_put_threads()
662 self._put_queue.put(block)
665 def get_bufferblock(self, locator):
666 return self._bufferblocks.get(locator)
669 def delete_bufferblock(self, locator):
670 self._delete_bufferblock(locator)
672 def _delete_bufferblock(self, locator):
673 bb = self._bufferblocks[locator]
675 del self._bufferblocks[locator]
677 def get_block_contents(self, locator, num_retries, cache_only=False):
680 First checks to see if the locator is a BufferBlock and return that, if
681 not, passes the request through to KeepClient.get().
685 if locator in self._bufferblocks:
686 bufferblock = self._bufferblocks[locator]
687 if bufferblock.state() != _BufferBlock.COMMITTED:
688 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
690 locator = bufferblock._locator
692 return self._keep.get_from_cache(locator)
694 return self._keep.get(locator, num_retries=num_retries)
696 def commit_all(self):
697 """Commit all outstanding buffer blocks.
699 This is a synchronous call, and will not return until all buffer blocks
700 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
703 self.repack_small_blocks(force=True, sync=True)
706 items = list(self._bufferblocks.items())
709 if v.state() != _BufferBlock.COMMITTED and v.owner:
710 v.owner.flush(sync=False)
713 if self._put_queue is not None:
714 self._put_queue.join()
718 if v.state() == _BufferBlock.ERROR:
719 err.append((v.locator(), v.error))
721 raise KeepWriteError("Error writing some blocks", err, label="block")
724 # flush again with sync=True to remove committed bufferblocks from
727 v.owner.flush(sync=True)
729 def block_prefetch(self, locator):
730 """Initiate a background download of a block.
732 This assumes that the underlying KeepClient implements a block cache,
733 so repeated requests for the same block will not result in repeated
734 downloads (unless the block is evicted from the cache.) This method
739 if not self.prefetch_enabled:
742 if self._keep.get_from_cache(locator) is not None:
746 if locator in self._bufferblocks:
749 self.start_get_threads()
750 self._prefetch_queue.put(locator)
753 class ArvadosFile(object):
754 """Represent a file in a Collection.
756 ArvadosFile manages the underlying representation of a file in Keep as a
757 sequence of segments spanning a set of blocks, and implements random
760 This object may be accessed from multiple threads.
764 def __init__(self, parent, name, stream=[], segments=[]):
766 ArvadosFile constructor.
769 a list of Range objects representing a block stream
772 a list of Range objects representing segments
776 self._writers = set()
777 self._committed = False
779 self.lock = parent.root_collection().lock
781 self._add_segment(stream, s.locator, s.range_size)
782 self._current_bblock = None
785 return self.parent.writable()
788 def permission_expired(self, as_of_dt=None):
789 """Returns True if any of the segment's locators is expired"""
790 for r in self._segments:
791 if KeepLocator(r.locator).permission_expired(as_of_dt):
797 return copy.copy(self._segments)
800 def clone(self, new_parent, new_name):
801 """Make a copy of this file."""
802 cp = ArvadosFile(new_parent, new_name)
803 cp.replace_contents(self)
808 def replace_contents(self, other):
809 """Replace segments of this file with segments from another `ArvadosFile` object."""
813 for other_segment in other.segments():
814 new_loc = other_segment.locator
815 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
816 if other_segment.locator not in map_loc:
817 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
818 if bufferblock.state() != _BufferBlock.WRITABLE:
819 map_loc[other_segment.locator] = bufferblock.locator()
821 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
822 new_loc = map_loc[other_segment.locator]
824 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
826 self.set_committed(False)
828 def __eq__(self, other):
831 if not isinstance(other, ArvadosFile):
834 othersegs = other.segments()
836 if len(self._segments) != len(othersegs):
838 for i in range(0, len(othersegs)):
839 seg1 = self._segments[i]
844 if self.parent._my_block_manager().is_bufferblock(loc1):
845 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
847 if other.parent._my_block_manager().is_bufferblock(loc2):
848 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
850 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
851 seg1.range_start != seg2.range_start or
852 seg1.range_size != seg2.range_size or
853 seg1.segment_offset != seg2.segment_offset):
858 def __ne__(self, other):
859 return not self.__eq__(other)
862 def set_segments(self, segs):
863 self._segments = segs
866 def set_committed(self, value=True):
867 """Set committed flag.
869 If value is True, set committed to be True.
871 If value is False, set committed to be False for this and all parents.
873 if value == self._committed:
875 self._committed = value
876 if self._committed is False and self.parent is not None:
877 self.parent.set_committed(False)
881 """Get whether this is committed or not."""
882 return self._committed
885 def add_writer(self, writer):
886 """Add an ArvadosFileWriter reference to the list of writers"""
887 if isinstance(writer, ArvadosFileWriter):
888 self._writers.add(writer)
891 def remove_writer(self, writer, flush):
893 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
894 and do some block maintenance tasks.
896 self._writers.remove(writer)
898 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
899 # File writer closed, not small enough for repacking
902 # All writers closed and size is adequate for repacking
903 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
907 Get whether this is closed or not. When the writers list is empty, the file
908 is supposed to be closed.
910 return len(self._writers) == 0
914 def truncate(self, size):
915 """Shrink the size of the file.
917 If `size` is less than the size of the file, the file contents after
918 `size` will be discarded. If `size` is greater than the current size
919 of the file, an IOError will be raised.
922 if size < self.size():
924 for r in self._segments:
925 range_end = r.range_start+r.range_size
926 if r.range_start >= size:
927 # segment is past the trucate size, all done
929 elif size < range_end:
930 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
931 nr.segment_offset = r.segment_offset
937 self._segments = new_segs
938 self.set_committed(False)
939 elif size > self.size():
940 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
942 def readfrom(self, offset, size, num_retries, exact=False):
943 """Read up to `size` bytes from the file starting at `offset`.
946 If False (default), return less data than requested if the read
947 crosses a block boundary and the next block isn't cached. If True,
948 only return less data than requested when hitting EOF.
952 if size == 0 or offset >= self.size():
954 readsegs = locators_and_ranges(self._segments, offset, size)
955 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
960 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
962 blockview = memoryview(block)
963 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
969 if lr.locator not in locs:
970 self.parent._my_block_manager().block_prefetch(lr.locator)
973 return b''.join(data)
975 def _repack_writes(self, num_retries):
976 """Test if the buffer block has more data than actual segments.
978 This happens when a buffered write over-writes a file range written in
979 a previous buffered write. Re-pack the buffer block for efficiency
980 and to avoid leaking information.
983 segs = self._segments
985 # Sum up the segments to get the total bytes of the file referencing
986 # into the buffer block.
987 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
988 write_total = sum([s.range_size for s in bufferblock_segs])
990 if write_total < self._current_bblock.size():
991 # There is more data in the buffer block than is actually accounted for by segments, so
992 # re-pack into a new buffer by copying over to a new buffer block.
993 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
994 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
995 for t in bufferblock_segs:
996 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
997 t.segment_offset = new_bb.size() - t.range_size
999 self._current_bblock = new_bb
1003 def writeto(self, offset, data, num_retries):
1004 """Write `data` to the file starting at `offset`.
1006 This will update existing bytes and/or extend the size of the file as
1010 if not isinstance(data, bytes) and not isinstance(data, memoryview):
1011 data = data.encode()
1015 if offset > self.size():
1016 raise ArgumentError("Offset is past the end of the file")
1018 if len(data) > config.KEEP_BLOCK_SIZE:
1019 # Chunk it up into smaller writes
1021 dataview = memoryview(data)
1022 while n < len(data):
1023 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1024 n += config.KEEP_BLOCK_SIZE
1027 self.set_committed(False)
1029 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1030 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1032 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1033 self._repack_writes(num_retries)
1034 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1035 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1036 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1038 self._current_bblock.append(data)
1040 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1042 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1047 def flush(self, sync=True, num_retries=0):
1048 """Flush the current bufferblock to Keep.
1051 If True, commit block synchronously, wait until buffer block has been written.
1052 If False, commit block asynchronously, return immediately after putting block into
1055 if self.committed():
1058 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1059 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1060 self._repack_writes(num_retries)
1061 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1065 for s in self._segments:
1066 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1068 if bb.state() != _BufferBlock.COMMITTED:
1069 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1070 to_delete.add(s.locator)
1071 s.locator = bb.locator()
1073 self.parent._my_block_manager().delete_bufferblock(s)
1075 self.parent.notify(MOD, self.parent, self.name, (self, self))
1079 def add_segment(self, blocks, pos, size):
1080 """Add a segment to the end of the file.
1082 `pos` and `offset` reference a section of the stream described by
1083 `blocks` (a list of Range objects)
1086 self._add_segment(blocks, pos, size)
1088 def _add_segment(self, blocks, pos, size):
1089 """Internal implementation of add_segment."""
1090 self.set_committed(False)
1091 for lr in locators_and_ranges(blocks, pos, size):
1092 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1093 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1094 self._segments.append(r)
1098 """Get the file size."""
1100 n = self._segments[-1]
1101 return n.range_start + n.range_size
1106 def manifest_text(self, stream_name=".", portable_locators=False,
1107 normalize=False, only_committed=False):
1110 for segment in self.segments:
1111 loc = segment.locator
1112 if self.parent._my_block_manager().is_bufferblock(loc):
1115 loc = self._bufferblocks[loc].calculate_locator()
1116 if portable_locators:
1117 loc = KeepLocator(loc).stripped()
1118 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1119 segment.segment_offset, segment.range_size))
1120 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1126 def _reparent(self, newparent, newname):
1127 self.set_committed(False)
1128 self.flush(sync=True)
1129 self.parent.remove(self.name)
1130 self.parent = newparent
1132 self.lock = self.parent.root_collection().lock
1135 class ArvadosFileReader(ArvadosFileReaderBase):
1136 """Wraps ArvadosFile in a file-like object supporting reading only.
1138 Be aware that this class is NOT thread safe as there is no locking around
1139 updating file pointer.
1143 def __init__(self, arvadosfile, mode="r", num_retries=None):
1144 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1145 self.arvadosfile = arvadosfile
1148 return self.arvadosfile.size()
1150 def stream_name(self):
1151 return self.arvadosfile.parent.stream_name()
1153 @_FileLikeObjectBase._before_close
1155 def read(self, size=None, num_retries=None):
1156 """Read up to `size` bytes from the file and return the result.
1158 Starts at the current file position. If `size` is None, read the
1159 entire remainder of the file.
1163 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1166 self._filepos += len(rd)
1167 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1168 return b''.join(data)
1170 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1171 self._filepos += len(data)
1174 @_FileLikeObjectBase._before_close
1176 def readfrom(self, offset, size, num_retries=None):
1177 """Read up to `size` bytes from the stream, starting at the specified file offset.
1179 This method does not change the file position.
1181 return self.arvadosfile.readfrom(offset, size, num_retries)
1187 class ArvadosFileWriter(ArvadosFileReader):
1188 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1190 Be aware that this class is NOT thread safe as there is no locking around
1191 updating file pointer.
1195 def __init__(self, arvadosfile, mode, num_retries=None):
1196 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1197 self.arvadosfile.add_writer(self)
1199 @_FileLikeObjectBase._before_close
1201 def write(self, data, num_retries=None):
1202 if self.mode[0] == "a":
1203 self.arvadosfile.writeto(self.size(), data, num_retries)
1205 self.arvadosfile.writeto(self._filepos, data, num_retries)
1206 self._filepos += len(data)
1209 @_FileLikeObjectBase._before_close
1211 def writelines(self, seq, num_retries=None):
1213 self.write(s, num_retries=num_retries)
1215 @_FileLikeObjectBase._before_close
1216 def truncate(self, size=None):
1218 size = self._filepos
1219 self.arvadosfile.truncate(size)
1220 if self._filepos > self.size():
1221 self._filepos = self.size()
1223 @_FileLikeObjectBase._before_close
1225 self.arvadosfile.flush()
1227 def close(self, flush=True):
1229 self.arvadosfile.remove_writer(self, flush)
1230 super(ArvadosFileWriter, self).close()