1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 from future.utils import listitems, listvalues
5 standard_library.install_aliases()
6 from builtins import range
7 from builtins import object
24 from .errors import KeepWriteError, AssertionError, ArgumentError
25 from .keep import KeepLocator
26 from ._normalize_stream import normalize_stream
27 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
28 from .retry import retry_method
33 _logger = logging.getLogger('arvados.arvfile')
36 """split(path) -> streamname, filename
38 Separate the stream name and file name in a /-separated stream path and
39 return a tuple (stream_name, file_name). If no stream name is available,
44 stream_name, file_name = path.rsplit('/', 1)
45 except ValueError: # No / in string
46 stream_name, file_name = '.', path
47 return stream_name, file_name
50 class UnownedBlockError(Exception):
51 """Raised when there's an writable block without an owner on the BlockManager."""
55 class _FileLikeObjectBase(object):
56 def __init__(self, name, mode):
62 def _before_close(orig_func):
63 @functools.wraps(orig_func)
64 def before_close_wrapper(self, *args, **kwargs):
66 raise ValueError("I/O operation on closed stream file")
67 return orig_func(self, *args, **kwargs)
68 return before_close_wrapper
73 def __exit__(self, exc_type, exc_value, traceback):
84 class ArvadosFileReaderBase(_FileLikeObjectBase):
85 def __init__(self, name, mode, num_retries=None):
86 super(ArvadosFileReaderBase, self).__init__(name, mode)
87 self._binary = 'b' in mode
88 if sys.version_info >= (3, 0) and not self._binary:
89 raise NotImplementedError("text mode {!r} is not implemented".format(mode))
91 self.num_retries = num_retries
92 self._readline_cache = (None, None)
96 data = self.readline()
101 def decompressed_name(self):
102 return re.sub('\.(bz2|gz)$', '', self.name)
104 @_FileLikeObjectBase._before_close
105 def seek(self, pos, whence=os.SEEK_SET):
106 if whence == os.SEEK_CUR:
108 elif whence == os.SEEK_END:
111 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
127 @_FileLikeObjectBase._before_close
129 def readall(self, size=2**20, num_retries=None):
131 data = self.read(size, num_retries=num_retries)
136 @_FileLikeObjectBase._before_close
138 def readline(self, size=float('inf'), num_retries=None):
139 cache_pos, cache_data = self._readline_cache
140 if self.tell() == cache_pos:
142 self._filepos += len(cache_data)
145 data_size = len(data[-1])
146 while (data_size < size) and (b'\n' not in data[-1]):
147 next_read = self.read(2 ** 20, num_retries=num_retries)
150 data.append(next_read)
151 data_size += len(next_read)
152 data = b''.join(data)
154 nextline_index = data.index(b'\n') + 1
156 nextline_index = len(data)
157 nextline_index = min(nextline_index, size)
158 self._filepos -= len(data) - nextline_index
159 self._readline_cache = (self.tell(), data[nextline_index:])
160 return data[:nextline_index].decode()
162 @_FileLikeObjectBase._before_close
164 def decompress(self, decompress, size, num_retries=None):
165 for segment in self.readall(size, num_retries=num_retries):
166 data = decompress(segment)
170 @_FileLikeObjectBase._before_close
172 def readall_decompressed(self, size=2**20, num_retries=None):
174 if self.name.endswith('.bz2'):
175 dc = bz2.BZ2Decompressor()
176 return self.decompress(dc.decompress, size,
177 num_retries=num_retries)
178 elif self.name.endswith('.gz'):
179 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
180 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
181 size, num_retries=num_retries)
183 return self.readall(size, num_retries=num_retries)
185 @_FileLikeObjectBase._before_close
187 def readlines(self, sizehint=float('inf'), num_retries=None):
190 for s in self.readall(num_retries=num_retries):
193 if data_size >= sizehint:
195 return b''.join(data).decode().splitlines(True)
198 raise IOError(errno.ENOSYS, "Not implemented")
200 def read(self, size, num_retries=None):
201 raise IOError(errno.ENOSYS, "Not implemented")
203 def readfrom(self, start, size, num_retries=None):
204 raise IOError(errno.ENOSYS, "Not implemented")
207 class StreamFileReader(ArvadosFileReaderBase):
208 class _NameAttribute(str):
209 # The Python file API provides a plain .name attribute.
210 # Older SDK provided a name() method.
211 # This class provides both, for maximum compatibility.
215 def __init__(self, stream, segments, name):
216 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
217 self._stream = stream
218 self.segments = segments
220 def stream_name(self):
221 return self._stream.name()
224 n = self.segments[-1]
225 return n.range_start + n.range_size
227 @_FileLikeObjectBase._before_close
229 def read(self, size, num_retries=None):
230 """Read up to 'size' bytes from the stream, starting at the current file position"""
235 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
237 lr = available_chunks[0]
238 data = self._stream.readfrom(lr.locator+lr.segment_offset,
240 num_retries=num_retries)
242 self._filepos += len(data)
245 @_FileLikeObjectBase._before_close
247 def readfrom(self, start, size, num_retries=None):
248 """Read up to 'size' bytes from the stream, starting at 'start'"""
253 for lr in locators_and_ranges(self.segments, start, size):
254 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
255 num_retries=num_retries))
256 return b''.join(data)
258 def as_manifest(self):
260 for r in self.segments:
261 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
262 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
265 def synchronized(orig_func):
266 @functools.wraps(orig_func)
267 def synchronized_wrapper(self, *args, **kwargs):
269 return orig_func(self, *args, **kwargs)
270 return synchronized_wrapper
273 class StateChangeError(Exception):
274 def __init__(self, message, state, nextstate):
275 super(StateChangeError, self).__init__(message)
277 self.nextstate = nextstate
279 class _BufferBlock(object):
280 """A stand-in for a Keep block that is in the process of being written.
282 Writers can append to it, get the size, and compute the Keep locator.
283 There are three valid states:
289 Block is in the process of being uploaded to Keep, append is an error.
292 The block has been written to Keep, its internal buffer has been
293 released, fetching the block will fetch it via keep client (since we
294 discarded the internal copy), and identifiers referring to the BufferBlock
295 can be replaced with the block locator.
305 def __init__(self, blockid, starting_capacity, owner):
308 the identifier for this block
311 the initial buffer capacity
314 ArvadosFile that owns this block
317 self.blockid = blockid
318 self.buffer_block = bytearray(starting_capacity)
319 self.buffer_view = memoryview(self.buffer_block)
320 self.write_pointer = 0
321 self._state = _BufferBlock.WRITABLE
324 self.lock = threading.Lock()
325 self.wait_for_commit = threading.Event()
329 def append(self, data):
330 """Append some data to the buffer.
332 Only valid if the block is in WRITABLE state. Implements an expanding
333 buffer, doubling capacity as needed to accomdate all the data.
336 if self._state == _BufferBlock.WRITABLE:
337 if not isinstance(data, bytes) and not isinstance(data, memoryview):
339 while (self.write_pointer+len(data)) > len(self.buffer_block):
340 new_buffer_block = bytearray(len(self.buffer_block) * 2)
341 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
342 self.buffer_block = new_buffer_block
343 self.buffer_view = memoryview(self.buffer_block)
344 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
345 self.write_pointer += len(data)
348 raise AssertionError("Buffer block is not writable")
350 STATE_TRANSITIONS = frozenset([
352 (PENDING, COMMITTED),
357 def set_state(self, nextstate, val=None):
358 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
359 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
360 self._state = nextstate
362 if self._state == _BufferBlock.PENDING:
363 self.wait_for_commit.clear()
365 if self._state == _BufferBlock.COMMITTED:
367 self.buffer_view = None
368 self.buffer_block = None
369 self.wait_for_commit.set()
371 if self._state == _BufferBlock.ERROR:
373 self.wait_for_commit.set()
380 """The amount of data written to the buffer."""
381 return self.write_pointer
385 """The Keep locator for this buffer's contents."""
386 if self._locator is None:
387 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
391 def clone(self, new_blockid, owner):
392 if self._state == _BufferBlock.COMMITTED:
393 raise AssertionError("Cannot duplicate committed buffer block")
394 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
395 bufferblock.append(self.buffer_view[0:self.size()])
400 self._state = _BufferBlock.DELETED
402 self.buffer_block = None
403 self.buffer_view = None
406 def repack_writes(self):
407 """Optimize buffer block by repacking segments in file sequence.
409 When the client makes random writes, they appear in the buffer block in
410 the sequence they were written rather than the sequence they appear in
411 the file. This makes for inefficient, fragmented manifests. Attempt
412 to optimize by repacking writes in file sequence.
415 if self._state != _BufferBlock.WRITABLE:
416 raise AssertionError("Cannot repack non-writable block")
418 segs = self.owner.segments()
420 # Collect the segments that reference the buffer block.
421 bufferblock_segs = [s for s in segs if s.locator == self.blockid]
423 # Collect total data referenced by segments (could be smaller than
424 # bufferblock size if a portion of the file was written and
426 write_total = sum([s.range_size for s in bufferblock_segs])
428 if write_total < self.size() or len(bufferblock_segs) > 1:
429 # If there's more than one segment referencing this block, it is
430 # due to out-of-order writes and will produce a fragmented
431 # manifest, so try to optimize by re-packing into a new buffer.
432 contents = self.buffer_view[0:self.write_pointer].tobytes()
433 new_bb = _BufferBlock(None, write_total, None)
434 for t in bufferblock_segs:
435 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
436 t.segment_offset = new_bb.size() - t.range_size
438 self.buffer_block = new_bb.buffer_block
439 self.buffer_view = new_bb.buffer_view
440 self.write_pointer = new_bb.write_pointer
443 self.owner.set_segments(segs)
446 return "<BufferBlock %s>" % (self.blockid)
449 class NoopLock(object):
453 def __exit__(self, exc_type, exc_value, traceback):
456 def acquire(self, blocking=False):
463 def must_be_writable(orig_func):
464 @functools.wraps(orig_func)
465 def must_be_writable_wrapper(self, *args, **kwargs):
466 if not self.writable():
467 raise IOError(errno.EROFS, "Collection is read-only.")
468 return orig_func(self, *args, **kwargs)
469 return must_be_writable_wrapper
472 class _BlockManager(object):
473 """BlockManager handles buffer blocks.
475 Also handles background block uploads, and background block prefetch for a
476 Collection of ArvadosFiles.
480 DEFAULT_PUT_THREADS = 2
481 DEFAULT_GET_THREADS = 2
483 def __init__(self, keep, copies=None, put_threads=None):
484 """keep: KeepClient object to use"""
486 self._bufferblocks = collections.OrderedDict()
487 self._put_queue = None
488 self._put_threads = None
489 self._prefetch_queue = None
490 self._prefetch_threads = None
491 self.lock = threading.Lock()
492 self.prefetch_enabled = True
494 self.num_put_threads = put_threads
496 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
497 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
499 self._pending_write_size = 0
500 self.threads_lock = threading.Lock()
501 self.padding_block = None
504 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
505 """Allocate a new, empty bufferblock in WRITABLE state and return it.
508 optional block identifier, otherwise one will be automatically assigned
511 optional capacity, otherwise will use default capacity
514 ArvadosFile that owns this block
517 return self._alloc_bufferblock(blockid, starting_capacity, owner)
519 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
521 blockid = str(uuid.uuid4())
522 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
523 self._bufferblocks[bufferblock.blockid] = bufferblock
527 def dup_block(self, block, owner):
528 """Create a new bufferblock initialized with the content of an existing bufferblock.
531 the buffer block to copy.
534 ArvadosFile that owns the new block
537 new_blockid = str(uuid.uuid4())
538 bufferblock = block.clone(new_blockid, owner)
539 self._bufferblocks[bufferblock.blockid] = bufferblock
543 def is_bufferblock(self, locator):
544 return locator in self._bufferblocks
546 def _commit_bufferblock_worker(self):
547 """Background uploader thread."""
551 bufferblock = self._put_queue.get()
552 if bufferblock is None:
555 if self.copies is None:
556 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
558 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
559 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
561 except Exception as e:
562 bufferblock.set_state(_BufferBlock.ERROR, e)
564 if self._put_queue is not None:
565 self._put_queue.task_done()
567 def start_put_threads(self):
568 with self.threads_lock:
569 if self._put_threads is None:
570 # Start uploader threads.
572 # If we don't limit the Queue size, the upload queue can quickly
573 # grow to take up gigabytes of RAM if the writing process is
574 # generating data more quickly than it can be send to the Keep
577 # With two upload threads and a queue size of 2, this means up to 4
578 # blocks pending. If they are full 64 MiB blocks, that means up to
579 # 256 MiB of internal buffering, which is the same size as the
580 # default download block cache in KeepClient.
581 self._put_queue = queue.Queue(maxsize=2)
583 self._put_threads = []
584 for i in range(0, self.num_put_threads):
585 thread = threading.Thread(target=self._commit_bufferblock_worker)
586 self._put_threads.append(thread)
590 def _block_prefetch_worker(self):
591 """The background downloader thread."""
594 b = self._prefetch_queue.get()
599 _logger.exception("Exception doing block prefetch")
602 def start_get_threads(self):
603 if self._prefetch_threads is None:
604 self._prefetch_queue = queue.Queue()
605 self._prefetch_threads = []
606 for i in range(0, self.num_get_threads):
607 thread = threading.Thread(target=self._block_prefetch_worker)
608 self._prefetch_threads.append(thread)
614 def stop_threads(self):
615 """Shut down and wait for background upload and download threads to finish."""
617 if self._put_threads is not None:
618 for t in self._put_threads:
619 self._put_queue.put(None)
620 for t in self._put_threads:
622 self._put_threads = None
623 self._put_queue = None
625 if self._prefetch_threads is not None:
626 for t in self._prefetch_threads:
627 self._prefetch_queue.put(None)
628 for t in self._prefetch_threads:
630 self._prefetch_threads = None
631 self._prefetch_queue = None
636 def __exit__(self, exc_type, exc_value, traceback):
640 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
641 """Packs small blocks together before uploading"""
643 self._pending_write_size += closed_file_size
645 # Check if there are enough small blocks for filling up one in full
646 if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
649 # Search blocks ready for getting packed together before being
651 # A WRITABLE block always has an owner.
652 # A WRITABLE block with its owner.closed() implies that its
653 # size is <= KEEP_BLOCK_SIZE/2.
655 small_blocks = [b for b in listvalues(self._bufferblocks)
656 if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
657 except AttributeError:
658 # Writable blocks without owner shouldn't exist.
659 raise UnownedBlockError()
661 if len(small_blocks) <= 1:
662 # Not enough small blocks for repacking
665 for bb in small_blocks:
668 # Update the pending write size count with its true value, just in case
669 # some small file was opened, written and closed several times.
670 self._pending_write_size = sum([b.size() for b in small_blocks])
672 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
675 new_bb = self._alloc_bufferblock()
677 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
678 bb = small_blocks.pop(0)
679 self._pending_write_size -= bb.size()
680 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
681 files.append((bb, new_bb.write_pointer - bb.size()))
683 self.commit_bufferblock(new_bb, sync=sync)
685 for bb, new_bb_segment_offset in files:
686 newsegs = bb.owner.segments()
688 if s.locator == bb.blockid:
689 s.locator = new_bb.locator()
690 s.segment_offset = new_bb_segment_offset+s.segment_offset
691 bb.owner.set_segments(newsegs)
692 self._delete_bufferblock(bb.blockid)
694 def commit_bufferblock(self, block, sync):
695 """Initiate a background upload of a bufferblock.
698 The block object to upload
701 If `sync` is True, upload the block synchronously.
702 If `sync` is False, upload the block asynchronously. This will
703 return immediately unless the upload queue is at capacity, in
704 which case it will wait on an upload queue slot.
708 # Mark the block as PENDING so to disallow any more appends.
709 block.set_state(_BufferBlock.PENDING)
710 except StateChangeError as e:
711 if e.state == _BufferBlock.PENDING:
713 block.wait_for_commit.wait()
716 if block.state() == _BufferBlock.COMMITTED:
718 elif block.state() == _BufferBlock.ERROR:
725 if self.copies is None:
726 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
728 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
729 block.set_state(_BufferBlock.COMMITTED, loc)
730 except Exception as e:
731 block.set_state(_BufferBlock.ERROR, e)
734 self.start_put_threads()
735 self._put_queue.put(block)
738 def get_bufferblock(self, locator):
739 return self._bufferblocks.get(locator)
742 def get_padding_block(self):
743 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
744 when using truncate() to extend the size of a file.
746 For reference (and possible future optimization), the md5sum of the
747 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
751 if self.padding_block is None:
752 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
753 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
754 self.commit_bufferblock(self.padding_block, False)
755 return self.padding_block
758 def delete_bufferblock(self, locator):
759 self._delete_bufferblock(locator)
761 def _delete_bufferblock(self, locator):
762 bb = self._bufferblocks[locator]
764 del self._bufferblocks[locator]
766 def get_block_contents(self, locator, num_retries, cache_only=False):
769 First checks to see if the locator is a BufferBlock and return that, if
770 not, passes the request through to KeepClient.get().
774 if locator in self._bufferblocks:
775 bufferblock = self._bufferblocks[locator]
776 if bufferblock.state() != _BufferBlock.COMMITTED:
777 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
779 locator = bufferblock._locator
781 return self._keep.get_from_cache(locator)
783 return self._keep.get(locator, num_retries=num_retries)
785 def commit_all(self):
786 """Commit all outstanding buffer blocks.
788 This is a synchronous call, and will not return until all buffer blocks
789 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
792 self.repack_small_blocks(force=True, sync=True)
795 items = listitems(self._bufferblocks)
798 if v.state() != _BufferBlock.COMMITTED and v.owner:
799 v.owner.flush(sync=False)
802 if self._put_queue is not None:
803 self._put_queue.join()
807 if v.state() == _BufferBlock.ERROR:
808 err.append((v.locator(), v.error))
810 raise KeepWriteError("Error writing some blocks", err, label="block")
813 # flush again with sync=True to remove committed bufferblocks from
816 v.owner.flush(sync=True)
818 def block_prefetch(self, locator):
819 """Initiate a background download of a block.
821 This assumes that the underlying KeepClient implements a block cache,
822 so repeated requests for the same block will not result in repeated
823 downloads (unless the block is evicted from the cache.) This method
828 if not self.prefetch_enabled:
831 if self._keep.get_from_cache(locator) is not None:
835 if locator in self._bufferblocks:
838 self.start_get_threads()
839 self._prefetch_queue.put(locator)
842 class ArvadosFile(object):
843 """Represent a file in a Collection.
845 ArvadosFile manages the underlying representation of a file in Keep as a
846 sequence of segments spanning a set of blocks, and implements random
849 This object may be accessed from multiple threads.
853 def __init__(self, parent, name, stream=[], segments=[]):
855 ArvadosFile constructor.
858 a list of Range objects representing a block stream
861 a list of Range objects representing segments
865 self._writers = set()
866 self._committed = False
868 self.lock = parent.root_collection().lock
870 self._add_segment(stream, s.locator, s.range_size)
871 self._current_bblock = None
874 return self.parent.writable()
877 def permission_expired(self, as_of_dt=None):
878 """Returns True if any of the segment's locators is expired"""
879 for r in self._segments:
880 if KeepLocator(r.locator).permission_expired(as_of_dt):
886 return copy.copy(self._segments)
889 def clone(self, new_parent, new_name):
890 """Make a copy of this file."""
891 cp = ArvadosFile(new_parent, new_name)
892 cp.replace_contents(self)
897 def replace_contents(self, other):
898 """Replace segments of this file with segments from another `ArvadosFile` object."""
902 for other_segment in other.segments():
903 new_loc = other_segment.locator
904 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
905 if other_segment.locator not in map_loc:
906 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
907 if bufferblock.state() != _BufferBlock.WRITABLE:
908 map_loc[other_segment.locator] = bufferblock.locator()
910 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
911 new_loc = map_loc[other_segment.locator]
913 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
915 self.set_committed(False)
917 def __eq__(self, other):
920 if not isinstance(other, ArvadosFile):
923 othersegs = other.segments()
925 if len(self._segments) != len(othersegs):
927 for i in range(0, len(othersegs)):
928 seg1 = self._segments[i]
933 if self.parent._my_block_manager().is_bufferblock(loc1):
934 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
936 if other.parent._my_block_manager().is_bufferblock(loc2):
937 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
939 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
940 seg1.range_start != seg2.range_start or
941 seg1.range_size != seg2.range_size or
942 seg1.segment_offset != seg2.segment_offset):
947 def __ne__(self, other):
948 return not self.__eq__(other)
951 def set_segments(self, segs):
952 self._segments = segs
955 def set_committed(self, value=True):
956 """Set committed flag.
958 If value is True, set committed to be True.
960 If value is False, set committed to be False for this and all parents.
962 if value == self._committed:
964 self._committed = value
965 if self._committed is False and self.parent is not None:
966 self.parent.set_committed(False)
970 """Get whether this is committed or not."""
971 return self._committed
974 def add_writer(self, writer):
975 """Add an ArvadosFileWriter reference to the list of writers"""
976 if isinstance(writer, ArvadosFileWriter):
977 self._writers.add(writer)
980 def remove_writer(self, writer, flush):
982 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
983 and do some block maintenance tasks.
985 self._writers.remove(writer)
987 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
988 # File writer closed, not small enough for repacking
991 # All writers closed and size is adequate for repacking
992 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
996 Get whether this is closed or not. When the writers list is empty, the file
997 is supposed to be closed.
999 return len(self._writers) == 0
1003 def truncate(self, size):
1004 """Shrink or expand the size of the file.
1006 If `size` is less than the size of the file, the file contents after
1007 `size` will be discarded. If `size` is greater than the current size
1008 of the file, it will be filled with zero bytes.
1011 if size < self.size():
1013 for r in self._segments:
1014 range_end = r.range_start+r.range_size
1015 if r.range_start >= size:
1016 # segment is past the trucate size, all done
1018 elif size < range_end:
1019 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1020 nr.segment_offset = r.segment_offset
1026 self._segments = new_segs
1027 self.set_committed(False)
1028 elif size > self.size():
1029 padding = self.parent._my_block_manager().get_padding_block()
1030 diff = size - self.size()
1031 while diff > config.KEEP_BLOCK_SIZE:
1032 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1033 diff -= config.KEEP_BLOCK_SIZE
1035 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1036 self.set_committed(False)
1038 # size == self.size()
1041 def readfrom(self, offset, size, num_retries, exact=False):
1042 """Read up to `size` bytes from the file starting at `offset`.
1045 If False (default), return less data than requested if the read
1046 crosses a block boundary and the next block isn't cached. If True,
1047 only return less data than requested when hitting EOF.
1051 if size == 0 or offset >= self.size():
1053 readsegs = locators_and_ranges(self._segments, offset, size)
1054 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
1059 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1061 blockview = memoryview(block)
1062 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1063 locs.add(lr.locator)
1068 if lr.locator not in locs:
1069 self.parent._my_block_manager().block_prefetch(lr.locator)
1070 locs.add(lr.locator)
1072 return b''.join(data)
1076 def writeto(self, offset, data, num_retries):
1077 """Write `data` to the file starting at `offset`.
1079 This will update existing bytes and/or extend the size of the file as
1083 if not isinstance(data, bytes) and not isinstance(data, memoryview):
1084 data = data.encode()
1088 if offset > self.size():
1089 self.truncate(offset)
1091 if len(data) > config.KEEP_BLOCK_SIZE:
1092 # Chunk it up into smaller writes
1094 dataview = memoryview(data)
1095 while n < len(data):
1096 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1097 n += config.KEEP_BLOCK_SIZE
1100 self.set_committed(False)
1102 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1103 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1105 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1106 self._current_bblock.repack_writes()
1107 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1108 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1109 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1111 self._current_bblock.append(data)
1113 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1115 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1120 def flush(self, sync=True, num_retries=0):
1121 """Flush the current bufferblock to Keep.
1124 If True, commit block synchronously, wait until buffer block has been written.
1125 If False, commit block asynchronously, return immediately after putting block into
1128 if self.committed():
1131 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1132 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1133 self._current_bblock.repack_writes()
1134 if self._current_bblock.state() != _BufferBlock.DELETED:
1135 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1139 for s in self._segments:
1140 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1142 if bb.state() != _BufferBlock.COMMITTED:
1143 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1144 to_delete.add(s.locator)
1145 s.locator = bb.locator()
1147 self.parent._my_block_manager().delete_bufferblock(s)
1149 self.parent.notify(MOD, self.parent, self.name, (self, self))
1153 def add_segment(self, blocks, pos, size):
1154 """Add a segment to the end of the file.
1156 `pos` and `offset` reference a section of the stream described by
1157 `blocks` (a list of Range objects)
1160 self._add_segment(blocks, pos, size)
1162 def _add_segment(self, blocks, pos, size):
1163 """Internal implementation of add_segment."""
1164 self.set_committed(False)
1165 for lr in locators_and_ranges(blocks, pos, size):
1166 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1167 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1168 self._segments.append(r)
1172 """Get the file size."""
1174 n = self._segments[-1]
1175 return n.range_start + n.range_size
1180 def manifest_text(self, stream_name=".", portable_locators=False,
1181 normalize=False, only_committed=False):
1184 for segment in self._segments:
1185 loc = segment.locator
1186 if self.parent._my_block_manager().is_bufferblock(loc):
1189 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1190 if portable_locators:
1191 loc = KeepLocator(loc).stripped()
1192 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1193 segment.segment_offset, segment.range_size))
1194 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1200 def _reparent(self, newparent, newname):
1201 self.set_committed(False)
1202 self.flush(sync=True)
1203 self.parent.remove(self.name)
1204 self.parent = newparent
1206 self.lock = self.parent.root_collection().lock
1209 class ArvadosFileReader(ArvadosFileReaderBase):
1210 """Wraps ArvadosFile in a file-like object supporting reading only.
1212 Be aware that this class is NOT thread safe as there is no locking around
1213 updating file pointer.
1217 def __init__(self, arvadosfile, mode="r", num_retries=None):
1218 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1219 self.arvadosfile = arvadosfile
1222 return self.arvadosfile.size()
1224 def stream_name(self):
1225 return self.arvadosfile.parent.stream_name()
1227 @_FileLikeObjectBase._before_close
1229 def read(self, size=None, num_retries=None):
1230 """Read up to `size` bytes from the file and return the result.
1232 Starts at the current file position. If `size` is None, read the
1233 entire remainder of the file.
1237 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1240 self._filepos += len(rd)
1241 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1242 return b''.join(data)
1244 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1245 self._filepos += len(data)
1248 @_FileLikeObjectBase._before_close
1250 def readfrom(self, offset, size, num_retries=None):
1251 """Read up to `size` bytes from the stream, starting at the specified file offset.
1253 This method does not change the file position.
1255 return self.arvadosfile.readfrom(offset, size, num_retries)
1261 class ArvadosFileWriter(ArvadosFileReader):
1262 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1264 Be aware that this class is NOT thread safe as there is no locking around
1265 updating file pointer.
1269 def __init__(self, arvadosfile, mode, num_retries=None):
1270 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1271 self.arvadosfile.add_writer(self)
1276 @_FileLikeObjectBase._before_close
1278 def write(self, data, num_retries=None):
1279 if self.mode[0] == "a":
1280 self._filepos = self.size()
1281 self.arvadosfile.writeto(self._filepos, data, num_retries)
1282 self._filepos += len(data)
1285 @_FileLikeObjectBase._before_close
1287 def writelines(self, seq, num_retries=None):
1289 self.write(s, num_retries=num_retries)
1291 @_FileLikeObjectBase._before_close
1292 def truncate(self, size=None):
1294 size = self._filepos
1295 self.arvadosfile.truncate(size)
1297 @_FileLikeObjectBase._before_close
1299 self.arvadosfile.flush()
1301 def close(self, flush=True):
1303 self.arvadosfile.remove_writer(self, flush)
1304 super(ArvadosFileWriter, self).close()