1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 from future.utils import listitems, listvalues
5 standard_library.install_aliases()
6 from builtins import range
7 from builtins import object
24 from .errors import KeepWriteError, AssertionError, ArgumentError
25 from .keep import KeepLocator
26 from ._normalize_stream import normalize_stream
27 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
28 from .retry import retry_method
33 _logger = logging.getLogger('arvados.arvfile')
36 """split(path) -> streamname, filename
38 Separate the stream name and file name in a /-separated stream path and
39 return a tuple (stream_name, file_name). If no stream name is available,
44 stream_name, file_name = path.rsplit('/', 1)
45 except ValueError: # No / in string
46 stream_name, file_name = '.', path
47 return stream_name, file_name
50 class UnownedBlockError(Exception):
51 """Raised when there's an writable block without an owner on the BlockManager."""
55 class _FileLikeObjectBase(object):
56 def __init__(self, name, mode):
62 def _before_close(orig_func):
63 @functools.wraps(orig_func)
64 def before_close_wrapper(self, *args, **kwargs):
66 raise ValueError("I/O operation on closed stream file")
67 return orig_func(self, *args, **kwargs)
68 return before_close_wrapper
73 def __exit__(self, exc_type, exc_value, traceback):
84 class ArvadosFileReaderBase(_FileLikeObjectBase):
85 def __init__(self, name, mode, num_retries=None):
86 super(ArvadosFileReaderBase, self).__init__(name, mode)
87 self._binary = 'b' in mode
88 if sys.version_info >= (3, 0) and not self._binary:
89 raise NotImplementedError("text mode {!r} is not implemented".format(mode))
91 self.num_retries = num_retries
92 self._readline_cache = (None, None)
96 data = self.readline()
101 def decompressed_name(self):
102 return re.sub('\.(bz2|gz)$', '', self.name)
104 @_FileLikeObjectBase._before_close
105 def seek(self, pos, whence=os.SEEK_SET):
106 if whence == os.SEEK_CUR:
108 elif whence == os.SEEK_END:
111 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
127 @_FileLikeObjectBase._before_close
129 def readall(self, size=2**20, num_retries=None):
131 data = self.read(size, num_retries=num_retries)
136 @_FileLikeObjectBase._before_close
138 def readline(self, size=float('inf'), num_retries=None):
139 cache_pos, cache_data = self._readline_cache
140 if self.tell() == cache_pos:
142 self._filepos += len(cache_data)
145 data_size = len(data[-1])
146 while (data_size < size) and (b'\n' not in data[-1]):
147 next_read = self.read(2 ** 20, num_retries=num_retries)
150 data.append(next_read)
151 data_size += len(next_read)
152 data = b''.join(data)
154 nextline_index = data.index(b'\n') + 1
156 nextline_index = len(data)
157 nextline_index = min(nextline_index, size)
158 self._filepos -= len(data) - nextline_index
159 self._readline_cache = (self.tell(), data[nextline_index:])
160 return data[:nextline_index].decode()
162 @_FileLikeObjectBase._before_close
164 def decompress(self, decompress, size, num_retries=None):
165 for segment in self.readall(size, num_retries=num_retries):
166 data = decompress(segment)
170 @_FileLikeObjectBase._before_close
172 def readall_decompressed(self, size=2**20, num_retries=None):
174 if self.name.endswith('.bz2'):
175 dc = bz2.BZ2Decompressor()
176 return self.decompress(dc.decompress, size,
177 num_retries=num_retries)
178 elif self.name.endswith('.gz'):
179 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
180 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
181 size, num_retries=num_retries)
183 return self.readall(size, num_retries=num_retries)
185 @_FileLikeObjectBase._before_close
187 def readlines(self, sizehint=float('inf'), num_retries=None):
190 for s in self.readall(num_retries=num_retries):
193 if data_size >= sizehint:
195 return b''.join(data).decode().splitlines(True)
198 raise IOError(errno.ENOSYS, "Not implemented")
200 def read(self, size, num_retries=None):
201 raise IOError(errno.ENOSYS, "Not implemented")
203 def readfrom(self, start, size, num_retries=None):
204 raise IOError(errno.ENOSYS, "Not implemented")
207 class StreamFileReader(ArvadosFileReaderBase):
208 class _NameAttribute(str):
209 # The Python file API provides a plain .name attribute.
210 # Older SDK provided a name() method.
211 # This class provides both, for maximum compatibility.
215 def __init__(self, stream, segments, name):
216 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
217 self._stream = stream
218 self.segments = segments
220 def stream_name(self):
221 return self._stream.name()
224 n = self.segments[-1]
225 return n.range_start + n.range_size
227 @_FileLikeObjectBase._before_close
229 def read(self, size, num_retries=None):
230 """Read up to 'size' bytes from the stream, starting at the current file position"""
235 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
237 lr = available_chunks[0]
238 data = self._stream.readfrom(lr.locator+lr.segment_offset,
240 num_retries=num_retries)
242 self._filepos += len(data)
245 @_FileLikeObjectBase._before_close
247 def readfrom(self, start, size, num_retries=None):
248 """Read up to 'size' bytes from the stream, starting at 'start'"""
253 for lr in locators_and_ranges(self.segments, start, size):
254 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
255 num_retries=num_retries))
256 return b''.join(data)
258 def as_manifest(self):
260 for r in self.segments:
261 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
262 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
265 def synchronized(orig_func):
266 @functools.wraps(orig_func)
267 def synchronized_wrapper(self, *args, **kwargs):
269 return orig_func(self, *args, **kwargs)
270 return synchronized_wrapper
273 class StateChangeError(Exception):
274 def __init__(self, message, state, nextstate):
275 super(StateChangeError, self).__init__(message)
277 self.nextstate = nextstate
279 class _BufferBlock(object):
280 """A stand-in for a Keep block that is in the process of being written.
282 Writers can append to it, get the size, and compute the Keep locator.
283 There are three valid states:
289 Block is in the process of being uploaded to Keep, append is an error.
292 The block has been written to Keep, its internal buffer has been
293 released, fetching the block will fetch it via keep client (since we
294 discarded the internal copy), and identifiers referring to the BufferBlock
295 can be replaced with the block locator.
305 def __init__(self, blockid, starting_capacity, owner):
308 the identifier for this block
311 the initial buffer capacity
314 ArvadosFile that owns this block
317 self.blockid = blockid
318 self.buffer_block = bytearray(starting_capacity)
319 self.buffer_view = memoryview(self.buffer_block)
320 self.write_pointer = 0
321 self._state = _BufferBlock.WRITABLE
324 self.lock = threading.Lock()
325 self.wait_for_commit = threading.Event()
329 def append(self, data):
330 """Append some data to the buffer.
332 Only valid if the block is in WRITABLE state. Implements an expanding
333 buffer, doubling capacity as needed to accomdate all the data.
336 if self._state == _BufferBlock.WRITABLE:
337 if not isinstance(data, bytes) and not isinstance(data, memoryview):
339 while (self.write_pointer+len(data)) > len(self.buffer_block):
340 new_buffer_block = bytearray(len(self.buffer_block) * 2)
341 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
342 self.buffer_block = new_buffer_block
343 self.buffer_view = memoryview(self.buffer_block)
344 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
345 self.write_pointer += len(data)
348 raise AssertionError("Buffer block is not writable")
350 STATE_TRANSITIONS = frozenset([
352 (PENDING, COMMITTED),
357 def set_state(self, nextstate, val=None):
358 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
359 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
360 self._state = nextstate
362 if self._state == _BufferBlock.PENDING:
363 self.wait_for_commit.clear()
365 if self._state == _BufferBlock.COMMITTED:
367 self.buffer_view = None
368 self.buffer_block = None
369 self.wait_for_commit.set()
371 if self._state == _BufferBlock.ERROR:
373 self.wait_for_commit.set()
380 """The amount of data written to the buffer."""
381 return self.write_pointer
385 """The Keep locator for this buffer's contents."""
386 if self._locator is None:
387 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
391 def clone(self, new_blockid, owner):
392 if self._state == _BufferBlock.COMMITTED:
393 raise AssertionError("Cannot duplicate committed buffer block")
394 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
395 bufferblock.append(self.buffer_view[0:self.size()])
400 self._state = _BufferBlock.DELETED
402 self.buffer_block = None
403 self.buffer_view = None
406 def repack_writes(self):
407 """Optimize buffer block by repacking segments in file sequence.
409 When the client makes random writes, they appear in the buffer block in
410 the sequence they were written rather than the sequence they appear in
411 the file. This makes for inefficient, fragmented manifests. Attempt
412 to optimize by repacking writes in file sequence.
415 if self._state != _BufferBlock.WRITABLE:
416 raise AssertionError("Cannot repack non-writable block")
418 segs = self.owner.segments()
420 # Collect the segments that reference the buffer block.
421 bufferblock_segs = [s for s in segs if s.locator == self.blockid]
423 # Collect total data referenced by segments (could be smaller than
424 # bufferblock size if a portion of the file was written and
426 write_total = sum([s.range_size for s in bufferblock_segs])
428 if write_total < self.size() or len(bufferblock_segs) > 1:
429 # If there's more than one segment referencing this block, it is
430 # due to out-of-order writes and will produce a fragmented
431 # manifest, so try to optimize by re-packing into a new buffer.
432 contents = self.buffer_view[0:self.write_pointer].tobytes()
433 new_bb = _BufferBlock(None, write_total, None)
434 for t in bufferblock_segs:
435 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
436 t.segment_offset = new_bb.size() - t.range_size
438 self.buffer_block = new_bb.buffer_block
439 self.buffer_view = new_bb.buffer_view
440 self.write_pointer = new_bb.write_pointer
443 self.owner.set_segments(segs)
446 return "<BufferBlock %s>" % (self.blockid)
449 class NoopLock(object):
453 def __exit__(self, exc_type, exc_value, traceback):
456 def acquire(self, blocking=False):
463 def must_be_writable(orig_func):
464 @functools.wraps(orig_func)
465 def must_be_writable_wrapper(self, *args, **kwargs):
466 if not self.writable():
467 raise IOError(errno.EROFS, "Collection is read-only.")
468 return orig_func(self, *args, **kwargs)
469 return must_be_writable_wrapper
472 class _BlockManager(object):
473 """BlockManager handles buffer blocks.
475 Also handles background block uploads, and background block prefetch for a
476 Collection of ArvadosFiles.
480 DEFAULT_PUT_THREADS = 2
481 DEFAULT_GET_THREADS = 2
483 def __init__(self, keep, copies=None, put_threads=None):
484 """keep: KeepClient object to use"""
486 self._bufferblocks = collections.OrderedDict()
487 self._put_queue = None
488 self._put_threads = None
489 self._prefetch_queue = None
490 self._prefetch_threads = None
491 self.lock = threading.Lock()
492 self.prefetch_enabled = True
494 self.num_put_threads = put_threads
496 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
497 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
499 self._pending_write_size = 0
500 self.threads_lock = threading.Lock()
501 self.padding_block = None
504 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
505 """Allocate a new, empty bufferblock in WRITABLE state and return it.
508 optional block identifier, otherwise one will be automatically assigned
511 optional capacity, otherwise will use default capacity
514 ArvadosFile that owns this block
517 return self._alloc_bufferblock(blockid, starting_capacity, owner)
519 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
521 blockid = str(uuid.uuid4())
522 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
523 self._bufferblocks[bufferblock.blockid] = bufferblock
527 def dup_block(self, block, owner):
528 """Create a new bufferblock initialized with the content of an existing bufferblock.
531 the buffer block to copy.
534 ArvadosFile that owns the new block
537 new_blockid = str(uuid.uuid4())
538 bufferblock = block.clone(new_blockid, owner)
539 self._bufferblocks[bufferblock.blockid] = bufferblock
543 def is_bufferblock(self, locator):
544 return locator in self._bufferblocks
546 def _commit_bufferblock_worker(self):
547 """Background uploader thread."""
551 bufferblock = self._put_queue.get()
552 if bufferblock is None:
555 if self.copies is None:
556 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
558 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
559 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
560 except Exception as e:
561 bufferblock.set_state(_BufferBlock.ERROR, e)
563 if self._put_queue is not None:
564 self._put_queue.task_done()
566 def start_put_threads(self):
567 with self.threads_lock:
568 if self._put_threads is None:
569 # Start uploader threads.
571 # If we don't limit the Queue size, the upload queue can quickly
572 # grow to take up gigabytes of RAM if the writing process is
573 # generating data more quickly than it can be send to the Keep
576 # With two upload threads and a queue size of 2, this means up to 4
577 # blocks pending. If they are full 64 MiB blocks, that means up to
578 # 256 MiB of internal buffering, which is the same size as the
579 # default download block cache in KeepClient.
580 self._put_queue = queue.Queue(maxsize=2)
582 self._put_threads = []
583 for i in range(0, self.num_put_threads):
584 thread = threading.Thread(target=self._commit_bufferblock_worker)
585 self._put_threads.append(thread)
589 def _block_prefetch_worker(self):
590 """The background downloader thread."""
593 b = self._prefetch_queue.get()
598 _logger.exception("Exception doing block prefetch")
601 def start_get_threads(self):
602 if self._prefetch_threads is None:
603 self._prefetch_queue = queue.Queue()
604 self._prefetch_threads = []
605 for i in range(0, self.num_get_threads):
606 thread = threading.Thread(target=self._block_prefetch_worker)
607 self._prefetch_threads.append(thread)
613 def stop_threads(self):
614 """Shut down and wait for background upload and download threads to finish."""
616 if self._put_threads is not None:
617 for t in self._put_threads:
618 self._put_queue.put(None)
619 for t in self._put_threads:
621 self._put_threads = None
622 self._put_queue = None
624 if self._prefetch_threads is not None:
625 for t in self._prefetch_threads:
626 self._prefetch_queue.put(None)
627 for t in self._prefetch_threads:
629 self._prefetch_threads = None
630 self._prefetch_queue = None
635 def __exit__(self, exc_type, exc_value, traceback):
639 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
640 """Packs small blocks together before uploading"""
642 self._pending_write_size += closed_file_size
644 # Check if there are enough small blocks for filling up one in full
645 if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
648 # Search blocks ready for getting packed together before being
650 # A WRITABLE block always has an owner.
651 # A WRITABLE block with its owner.closed() implies that its
652 # size is <= KEEP_BLOCK_SIZE/2.
654 small_blocks = [b for b in listvalues(self._bufferblocks)
655 if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
656 except AttributeError:
657 # Writable blocks without owner shouldn't exist.
658 raise UnownedBlockError()
660 if len(small_blocks) <= 1:
661 # Not enough small blocks for repacking
664 for bb in small_blocks:
667 # Update the pending write size count with its true value, just in case
668 # some small file was opened, written and closed several times.
669 self._pending_write_size = sum([b.size() for b in small_blocks])
671 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
674 new_bb = self._alloc_bufferblock()
677 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
678 bb = small_blocks.pop(0)
679 new_bb.owner.append(bb.owner)
680 self._pending_write_size -= bb.size()
681 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
682 files.append((bb, new_bb.write_pointer - bb.size()))
684 self.commit_bufferblock(new_bb, sync=sync)
686 for bb, new_bb_segment_offset in files:
687 newsegs = bb.owner.segments()
689 if s.locator == bb.blockid:
690 s.locator = new_bb.blockid
691 s.segment_offset = new_bb_segment_offset+s.segment_offset
692 bb.owner.set_segments(newsegs)
693 self._delete_bufferblock(bb.blockid)
695 def commit_bufferblock(self, block, sync):
696 """Initiate a background upload of a bufferblock.
699 The block object to upload
702 If `sync` is True, upload the block synchronously.
703 If `sync` is False, upload the block asynchronously. This will
704 return immediately unless the upload queue is at capacity, in
705 which case it will wait on an upload queue slot.
709 # Mark the block as PENDING so to disallow any more appends.
710 block.set_state(_BufferBlock.PENDING)
711 except StateChangeError as e:
712 if e.state == _BufferBlock.PENDING:
714 block.wait_for_commit.wait()
717 if block.state() == _BufferBlock.COMMITTED:
719 elif block.state() == _BufferBlock.ERROR:
726 if self.copies is None:
727 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
729 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
730 block.set_state(_BufferBlock.COMMITTED, loc)
731 except Exception as e:
732 block.set_state(_BufferBlock.ERROR, e)
735 self.start_put_threads()
736 self._put_queue.put(block)
739 def get_bufferblock(self, locator):
740 return self._bufferblocks.get(locator)
743 def get_padding_block(self):
744 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
745 when using truncate() to extend the size of a file.
747 For reference (and possible future optimization), the md5sum of the
748 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
752 if self.padding_block is None:
753 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
754 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
755 self.commit_bufferblock(self.padding_block, False)
756 return self.padding_block
759 def delete_bufferblock(self, locator):
760 self._delete_bufferblock(locator)
762 def _delete_bufferblock(self, locator):
763 bb = self._bufferblocks[locator]
765 del self._bufferblocks[locator]
767 def get_block_contents(self, locator, num_retries, cache_only=False):
770 First checks to see if the locator is a BufferBlock and return that, if
771 not, passes the request through to KeepClient.get().
775 if locator in self._bufferblocks:
776 bufferblock = self._bufferblocks[locator]
777 if bufferblock.state() != _BufferBlock.COMMITTED:
778 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
780 locator = bufferblock._locator
782 return self._keep.get_from_cache(locator)
784 return self._keep.get(locator, num_retries=num_retries)
786 def commit_all(self):
787 """Commit all outstanding buffer blocks.
789 This is a synchronous call, and will not return until all buffer blocks
790 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
793 self.repack_small_blocks(force=True, sync=True)
796 items = listitems(self._bufferblocks)
799 if v.state() != _BufferBlock.COMMITTED and v.owner:
800 # Ignore blocks with a list of owners, as if they're not in COMMITTED
801 # state, they're already being committed asynchronously.
802 if isinstance(v.owner, ArvadosFile):
803 v.owner.flush(sync=False)
806 if self._put_queue is not None:
807 self._put_queue.join()
811 if v.state() == _BufferBlock.ERROR:
812 err.append((v.locator(), v.error))
814 raise KeepWriteError("Error writing some blocks", err, label="block")
817 # flush again with sync=True to remove committed bufferblocks from
820 if isinstance(v.owner, ArvadosFile):
821 v.owner.flush(sync=True)
822 elif isinstance(v.owner, list) and len(v.owner) > 0:
823 # This bufferblock is referenced by many files as a result
824 # of repacking small blocks, so don't delete it when flushing
825 # its owners, just do it after flushing them all.
826 for owner in v.owner:
827 owner.flush(sync=True)
828 self.delete_bufferblock(k)
830 def block_prefetch(self, locator):
831 """Initiate a background download of a block.
833 This assumes that the underlying KeepClient implements a block cache,
834 so repeated requests for the same block will not result in repeated
835 downloads (unless the block is evicted from the cache.) This method
840 if not self.prefetch_enabled:
843 if self._keep.get_from_cache(locator) is not None:
847 if locator in self._bufferblocks:
850 self.start_get_threads()
851 self._prefetch_queue.put(locator)
854 class ArvadosFile(object):
855 """Represent a file in a Collection.
857 ArvadosFile manages the underlying representation of a file in Keep as a
858 sequence of segments spanning a set of blocks, and implements random
861 This object may be accessed from multiple threads.
865 def __init__(self, parent, name, stream=[], segments=[]):
867 ArvadosFile constructor.
870 a list of Range objects representing a block stream
873 a list of Range objects representing segments
877 self._writers = set()
878 self._committed = False
880 self.lock = parent.root_collection().lock
882 self._add_segment(stream, s.locator, s.range_size)
883 self._current_bblock = None
886 return self.parent.writable()
889 def permission_expired(self, as_of_dt=None):
890 """Returns True if any of the segment's locators is expired"""
891 for r in self._segments:
892 if KeepLocator(r.locator).permission_expired(as_of_dt):
898 return copy.copy(self._segments)
901 def clone(self, new_parent, new_name):
902 """Make a copy of this file."""
903 cp = ArvadosFile(new_parent, new_name)
904 cp.replace_contents(self)
909 def replace_contents(self, other):
910 """Replace segments of this file with segments from another `ArvadosFile` object."""
914 for other_segment in other.segments():
915 new_loc = other_segment.locator
916 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
917 if other_segment.locator not in map_loc:
918 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
919 if bufferblock.state() != _BufferBlock.WRITABLE:
920 map_loc[other_segment.locator] = bufferblock.locator()
922 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
923 new_loc = map_loc[other_segment.locator]
925 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
927 self.set_committed(False)
929 def __eq__(self, other):
932 if not isinstance(other, ArvadosFile):
935 othersegs = other.segments()
937 if len(self._segments) != len(othersegs):
939 for i in range(0, len(othersegs)):
940 seg1 = self._segments[i]
945 if self.parent._my_block_manager().is_bufferblock(loc1):
946 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
948 if other.parent._my_block_manager().is_bufferblock(loc2):
949 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
951 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
952 seg1.range_start != seg2.range_start or
953 seg1.range_size != seg2.range_size or
954 seg1.segment_offset != seg2.segment_offset):
959 def __ne__(self, other):
960 return not self.__eq__(other)
963 def set_segments(self, segs):
964 self._segments = segs
967 def set_committed(self, value=True):
968 """Set committed flag.
970 If value is True, set committed to be True.
972 If value is False, set committed to be False for this and all parents.
974 if value == self._committed:
976 self._committed = value
977 if self._committed is False and self.parent is not None:
978 self.parent.set_committed(False)
982 """Get whether this is committed or not."""
983 return self._committed
986 def add_writer(self, writer):
987 """Add an ArvadosFileWriter reference to the list of writers"""
988 if isinstance(writer, ArvadosFileWriter):
989 self._writers.add(writer)
992 def remove_writer(self, writer, flush):
994 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
995 and do some block maintenance tasks.
997 self._writers.remove(writer)
999 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
1000 # File writer closed, not small enough for repacking
1003 # All writers closed and size is adequate for repacking
1004 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
1008 Get whether this is closed or not. When the writers list is empty, the file
1009 is supposed to be closed.
1011 return len(self._writers) == 0
1015 def truncate(self, size):
1016 """Shrink or expand the size of the file.
1018 If `size` is less than the size of the file, the file contents after
1019 `size` will be discarded. If `size` is greater than the current size
1020 of the file, it will be filled with zero bytes.
1023 if size < self.size():
1025 for r in self._segments:
1026 range_end = r.range_start+r.range_size
1027 if r.range_start >= size:
1028 # segment is past the trucate size, all done
1030 elif size < range_end:
1031 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1032 nr.segment_offset = r.segment_offset
1038 self._segments = new_segs
1039 self.set_committed(False)
1040 elif size > self.size():
1041 padding = self.parent._my_block_manager().get_padding_block()
1042 diff = size - self.size()
1043 while diff > config.KEEP_BLOCK_SIZE:
1044 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1045 diff -= config.KEEP_BLOCK_SIZE
1047 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1048 self.set_committed(False)
1050 # size == self.size()
1053 def readfrom(self, offset, size, num_retries, exact=False):
1054 """Read up to `size` bytes from the file starting at `offset`.
1057 If False (default), return less data than requested if the read
1058 crosses a block boundary and the next block isn't cached. If True,
1059 only return less data than requested when hitting EOF.
1063 if size == 0 or offset >= self.size():
1065 readsegs = locators_and_ranges(self._segments, offset, size)
1066 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
1071 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1073 blockview = memoryview(block)
1074 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1075 locs.add(lr.locator)
1080 if lr.locator not in locs:
1081 self.parent._my_block_manager().block_prefetch(lr.locator)
1082 locs.add(lr.locator)
1084 return b''.join(data)
1088 def writeto(self, offset, data, num_retries):
1089 """Write `data` to the file starting at `offset`.
1091 This will update existing bytes and/or extend the size of the file as
1095 if not isinstance(data, bytes) and not isinstance(data, memoryview):
1096 data = data.encode()
1100 if offset > self.size():
1101 self.truncate(offset)
1103 if len(data) > config.KEEP_BLOCK_SIZE:
1104 # Chunk it up into smaller writes
1106 dataview = memoryview(data)
1107 while n < len(data):
1108 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1109 n += config.KEEP_BLOCK_SIZE
1112 self.set_committed(False)
1114 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1115 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1117 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1118 self._current_bblock.repack_writes()
1119 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1120 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1121 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1123 self._current_bblock.append(data)
1125 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1127 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1132 def flush(self, sync=True, num_retries=0):
1133 """Flush the current bufferblock to Keep.
1136 If True, commit block synchronously, wait until buffer block has been written.
1137 If False, commit block asynchronously, return immediately after putting block into
1140 if self.committed():
1143 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1144 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1145 self._current_bblock.repack_writes()
1146 if self._current_bblock.state() != _BufferBlock.DELETED:
1147 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1151 for s in self._segments:
1152 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1154 if bb.state() != _BufferBlock.COMMITTED:
1155 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1156 to_delete.add(s.locator)
1157 s.locator = bb.locator()
1159 # Don't delete the bufferblock if it's owned by many files. It'll be
1160 # deleted after all of its owners are flush()ed.
1161 if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1162 self.parent._my_block_manager().delete_bufferblock(s)
1164 self.parent.notify(MOD, self.parent, self.name, (self, self))
1168 def add_segment(self, blocks, pos, size):
1169 """Add a segment to the end of the file.
1171 `pos` and `offset` reference a section of the stream described by
1172 `blocks` (a list of Range objects)
1175 self._add_segment(blocks, pos, size)
1177 def _add_segment(self, blocks, pos, size):
1178 """Internal implementation of add_segment."""
1179 self.set_committed(False)
1180 for lr in locators_and_ranges(blocks, pos, size):
1181 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1182 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1183 self._segments.append(r)
1187 """Get the file size."""
1189 n = self._segments[-1]
1190 return n.range_start + n.range_size
1195 def manifest_text(self, stream_name=".", portable_locators=False,
1196 normalize=False, only_committed=False):
1199 for segment in self._segments:
1200 loc = segment.locator
1201 if self.parent._my_block_manager().is_bufferblock(loc):
1204 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1205 if portable_locators:
1206 loc = KeepLocator(loc).stripped()
1207 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1208 segment.segment_offset, segment.range_size))
1209 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1215 def _reparent(self, newparent, newname):
1216 self.set_committed(False)
1217 self.flush(sync=True)
1218 self.parent.remove(self.name)
1219 self.parent = newparent
1221 self.lock = self.parent.root_collection().lock
1224 class ArvadosFileReader(ArvadosFileReaderBase):
1225 """Wraps ArvadosFile in a file-like object supporting reading only.
1227 Be aware that this class is NOT thread safe as there is no locking around
1228 updating file pointer.
1232 def __init__(self, arvadosfile, mode="r", num_retries=None):
1233 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1234 self.arvadosfile = arvadosfile
1237 return self.arvadosfile.size()
1239 def stream_name(self):
1240 return self.arvadosfile.parent.stream_name()
1242 @_FileLikeObjectBase._before_close
1244 def read(self, size=None, num_retries=None):
1245 """Read up to `size` bytes from the file and return the result.
1247 Starts at the current file position. If `size` is None, read the
1248 entire remainder of the file.
1252 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1255 self._filepos += len(rd)
1256 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1257 return b''.join(data)
1259 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1260 self._filepos += len(data)
1263 @_FileLikeObjectBase._before_close
1265 def readfrom(self, offset, size, num_retries=None):
1266 """Read up to `size` bytes from the stream, starting at the specified file offset.
1268 This method does not change the file position.
1270 return self.arvadosfile.readfrom(offset, size, num_retries)
1276 class ArvadosFileWriter(ArvadosFileReader):
1277 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1279 Be aware that this class is NOT thread safe as there is no locking around
1280 updating file pointer.
1284 def __init__(self, arvadosfile, mode, num_retries=None):
1285 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1286 self.arvadosfile.add_writer(self)
1291 @_FileLikeObjectBase._before_close
1293 def write(self, data, num_retries=None):
1294 if self.mode[0] == "a":
1295 self._filepos = self.size()
1296 self.arvadosfile.writeto(self._filepos, data, num_retries)
1297 self._filepos += len(data)
1300 @_FileLikeObjectBase._before_close
1302 def writelines(self, seq, num_retries=None):
1304 self.write(s, num_retries=num_retries)
1306 @_FileLikeObjectBase._before_close
1307 def truncate(self, size=None):
1309 size = self._filepos
1310 self.arvadosfile.truncate(size)
1312 @_FileLikeObjectBase._before_close
1314 self.arvadosfile.flush()
1316 def close(self, flush=True):
1318 self.arvadosfile.remove_writer(self, flush)
1319 super(ArvadosFileWriter, self).close()