1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 from future.utils import listitems, listvalues
5 standard_library.install_aliases()
6 from builtins import range
7 from builtins import object
24 from .errors import KeepWriteError, AssertionError, ArgumentError
25 from .keep import KeepLocator
26 from ._normalize_stream import normalize_stream
27 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
28 from .retry import retry_method
33 _logger = logging.getLogger('arvados.arvfile')
36 """split(path) -> streamname, filename
38 Separate the stream name and file name in a /-separated stream path and
39 return a tuple (stream_name, file_name). If no stream name is available,
44 stream_name, file_name = path.rsplit('/', 1)
45 except ValueError: # No / in string
46 stream_name, file_name = '.', path
47 return stream_name, file_name
50 class UnownedBlockError(Exception):
51 """Raised when there's an writable block without an owner on the BlockManager."""
55 class _FileLikeObjectBase(object):
56 def __init__(self, name, mode):
62 def _before_close(orig_func):
63 @functools.wraps(orig_func)
64 def before_close_wrapper(self, *args, **kwargs):
66 raise ValueError("I/O operation on closed stream file")
67 return orig_func(self, *args, **kwargs)
68 return before_close_wrapper
73 def __exit__(self, exc_type, exc_value, traceback):
84 class ArvadosFileReaderBase(_FileLikeObjectBase):
85 def __init__(self, name, mode, num_retries=None):
86 super(ArvadosFileReaderBase, self).__init__(name, mode)
87 self._binary = 'b' in mode
88 if sys.version_info >= (3, 0) and not self._binary:
89 raise NotImplementedError("text mode {!r} is not implemented".format(mode))
91 self.num_retries = num_retries
92 self._readline_cache = (None, None)
96 data = self.readline()
101 def decompressed_name(self):
102 return re.sub('\.(bz2|gz)$', '', self.name)
104 @_FileLikeObjectBase._before_close
105 def seek(self, pos, whence=os.SEEK_SET):
106 if whence == os.SEEK_CUR:
108 elif whence == os.SEEK_END:
111 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
127 @_FileLikeObjectBase._before_close
129 def readall(self, size=2**20, num_retries=None):
131 data = self.read(size, num_retries=num_retries)
136 @_FileLikeObjectBase._before_close
138 def readline(self, size=float('inf'), num_retries=None):
139 cache_pos, cache_data = self._readline_cache
140 if self.tell() == cache_pos:
142 self._filepos += len(cache_data)
145 data_size = len(data[-1])
146 while (data_size < size) and (b'\n' not in data[-1]):
147 next_read = self.read(2 ** 20, num_retries=num_retries)
150 data.append(next_read)
151 data_size += len(next_read)
152 data = b''.join(data)
154 nextline_index = data.index(b'\n') + 1
156 nextline_index = len(data)
157 nextline_index = min(nextline_index, size)
158 self._filepos -= len(data) - nextline_index
159 self._readline_cache = (self.tell(), data[nextline_index:])
160 return data[:nextline_index].decode()
162 @_FileLikeObjectBase._before_close
164 def decompress(self, decompress, size, num_retries=None):
165 for segment in self.readall(size, num_retries=num_retries):
166 data = decompress(segment)
170 @_FileLikeObjectBase._before_close
172 def readall_decompressed(self, size=2**20, num_retries=None):
174 if self.name.endswith('.bz2'):
175 dc = bz2.BZ2Decompressor()
176 return self.decompress(dc.decompress, size,
177 num_retries=num_retries)
178 elif self.name.endswith('.gz'):
179 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
180 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
181 size, num_retries=num_retries)
183 return self.readall(size, num_retries=num_retries)
185 @_FileLikeObjectBase._before_close
187 def readlines(self, sizehint=float('inf'), num_retries=None):
190 for s in self.readall(num_retries=num_retries):
193 if data_size >= sizehint:
195 return b''.join(data).decode().splitlines(True)
198 raise IOError(errno.ENOSYS, "Not implemented")
200 def read(self, size, num_retries=None):
201 raise IOError(errno.ENOSYS, "Not implemented")
203 def readfrom(self, start, size, num_retries=None):
204 raise IOError(errno.ENOSYS, "Not implemented")
207 class StreamFileReader(ArvadosFileReaderBase):
208 class _NameAttribute(str):
209 # The Python file API provides a plain .name attribute.
210 # Older SDK provided a name() method.
211 # This class provides both, for maximum compatibility.
215 def __init__(self, stream, segments, name):
216 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
217 self._stream = stream
218 self.segments = segments
220 def stream_name(self):
221 return self._stream.name()
224 n = self.segments[-1]
225 return n.range_start + n.range_size
227 @_FileLikeObjectBase._before_close
229 def read(self, size, num_retries=None):
230 """Read up to 'size' bytes from the stream, starting at the current file position"""
235 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
237 lr = available_chunks[0]
238 data = self._stream.readfrom(lr.locator+lr.segment_offset,
240 num_retries=num_retries)
242 self._filepos += len(data)
245 @_FileLikeObjectBase._before_close
247 def readfrom(self, start, size, num_retries=None):
248 """Read up to 'size' bytes from the stream, starting at 'start'"""
253 for lr in locators_and_ranges(self.segments, start, size):
254 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
255 num_retries=num_retries))
256 return b''.join(data)
258 def as_manifest(self):
260 for r in self.segments:
261 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
262 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
265 def synchronized(orig_func):
266 @functools.wraps(orig_func)
267 def synchronized_wrapper(self, *args, **kwargs):
269 return orig_func(self, *args, **kwargs)
270 return synchronized_wrapper
273 class StateChangeError(Exception):
274 def __init__(self, message, state, nextstate):
275 super(StateChangeError, self).__init__(message)
277 self.nextstate = nextstate
279 class _BufferBlock(object):
280 """A stand-in for a Keep block that is in the process of being written.
282 Writers can append to it, get the size, and compute the Keep locator.
283 There are three valid states:
289 Block is in the process of being uploaded to Keep, append is an error.
292 The block has been written to Keep, its internal buffer has been
293 released, fetching the block will fetch it via keep client (since we
294 discarded the internal copy), and identifiers referring to the BufferBlock
295 can be replaced with the block locator.
305 def __init__(self, blockid, starting_capacity, owner):
308 the identifier for this block
311 the initial buffer capacity
314 ArvadosFile that owns this block
317 self.blockid = blockid
318 self.buffer_block = bytearray(starting_capacity)
319 self.buffer_view = memoryview(self.buffer_block)
320 self.write_pointer = 0
321 self._state = _BufferBlock.WRITABLE
324 self.lock = threading.Lock()
325 self.wait_for_commit = threading.Event()
329 def append(self, data):
330 """Append some data to the buffer.
332 Only valid if the block is in WRITABLE state. Implements an expanding
333 buffer, doubling capacity as needed to accomdate all the data.
336 if self._state == _BufferBlock.WRITABLE:
337 if not isinstance(data, bytes) and not isinstance(data, memoryview):
339 while (self.write_pointer+len(data)) > len(self.buffer_block):
340 new_buffer_block = bytearray(len(self.buffer_block) * 2)
341 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
342 self.buffer_block = new_buffer_block
343 self.buffer_view = memoryview(self.buffer_block)
344 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
345 self.write_pointer += len(data)
348 raise AssertionError("Buffer block is not writable")
350 STATE_TRANSITIONS = frozenset([
352 (PENDING, COMMITTED),
357 def set_state(self, nextstate, val=None):
358 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
359 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
360 self._state = nextstate
362 if self._state == _BufferBlock.PENDING:
363 self.wait_for_commit.clear()
365 if self._state == _BufferBlock.COMMITTED:
367 self.buffer_view = None
368 self.buffer_block = None
369 self.wait_for_commit.set()
371 if self._state == _BufferBlock.ERROR:
373 self.wait_for_commit.set()
380 """The amount of data written to the buffer."""
381 return self.write_pointer
385 """The Keep locator for this buffer's contents."""
386 if self._locator is None:
387 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
391 def clone(self, new_blockid, owner):
392 if self._state == _BufferBlock.COMMITTED:
393 raise AssertionError("Cannot duplicate committed buffer block")
394 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
395 bufferblock.append(self.buffer_view[0:self.size()])
400 self._state = _BufferBlock.DELETED
402 self.buffer_block = None
403 self.buffer_view = None
406 def repack_writes(self):
407 """Optimize buffer block by repacking segments in file sequence.
409 When the client makes random writes, they appear in the buffer block in
410 the sequence they were written rather than the sequence they appear in
411 the file. This makes for inefficient, fragmented manifests. Attempt
412 to optimize by repacking writes in file sequence.
415 if self._state != _BufferBlock.WRITABLE:
416 raise AssertionError("Cannot repack non-writable block")
418 segs = self.owner.segments()
420 # Collect the segments that reference the buffer block.
421 bufferblock_segs = [s for s in segs if s.locator == self.blockid]
423 # Collect total data referenced by segments (could be smaller than
424 # bufferblock size if a portion of the file was written and
426 write_total = sum([s.range_size for s in bufferblock_segs])
428 if write_total < self.size() or len(bufferblock_segs) > 1:
429 # If there's more than one segment referencing this block, it is
430 # due to out-of-order writes and will produce a fragmented
431 # manifest, so try to optimize by re-packing into a new buffer.
432 contents = self.buffer_view[0:self.write_pointer].tobytes()
433 new_bb = _BufferBlock(None, write_total, None)
434 for t in bufferblock_segs:
435 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
436 t.segment_offset = new_bb.size() - t.range_size
438 self.buffer_block = new_bb.buffer_block
439 self.buffer_view = new_bb.buffer_view
440 self.write_pointer = new_bb.write_pointer
443 self.owner.set_segments(segs)
446 return "<BufferBlock %s>" % (self.blockid)
449 class NoopLock(object):
453 def __exit__(self, exc_type, exc_value, traceback):
456 def acquire(self, blocking=False):
463 def must_be_writable(orig_func):
464 @functools.wraps(orig_func)
465 def must_be_writable_wrapper(self, *args, **kwargs):
466 if not self.writable():
467 raise IOError(errno.EROFS, "Collection is read-only.")
468 return orig_func(self, *args, **kwargs)
469 return must_be_writable_wrapper
472 class _BlockManager(object):
473 """BlockManager handles buffer blocks.
475 Also handles background block uploads, and background block prefetch for a
476 Collection of ArvadosFiles.
480 DEFAULT_PUT_THREADS = 2
481 DEFAULT_GET_THREADS = 2
483 def __init__(self, keep, copies=None, put_threads=None):
484 """keep: KeepClient object to use"""
486 self._bufferblocks = collections.OrderedDict()
487 self._put_queue = None
488 self._put_threads = None
489 self._prefetch_queue = None
490 self._prefetch_threads = None
491 self.lock = threading.Lock()
492 self.prefetch_enabled = True
494 self.num_put_threads = put_threads
496 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
497 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
499 self._pending_write_size = 0
500 self.threads_lock = threading.Lock()
501 self.padding_block = None
502 self._repacked_bb = {}
503 self._repacked_bb_lock = threading.Lock()
506 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
507 """Allocate a new, empty bufferblock in WRITABLE state and return it.
510 optional block identifier, otherwise one will be automatically assigned
513 optional capacity, otherwise will use default capacity
516 ArvadosFile that owns this block
519 return self._alloc_bufferblock(blockid, starting_capacity, owner)
521 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
523 blockid = str(uuid.uuid4())
524 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
525 self._bufferblocks[bufferblock.blockid] = bufferblock
529 def dup_block(self, block, owner):
530 """Create a new bufferblock initialized with the content of an existing bufferblock.
533 the buffer block to copy.
536 ArvadosFile that owns the new block
539 new_blockid = str(uuid.uuid4())
540 bufferblock = block.clone(new_blockid, owner)
541 self._bufferblocks[bufferblock.blockid] = bufferblock
545 def is_bufferblock(self, locator):
546 return locator in self._bufferblocks
548 def _commit_bufferblock_worker(self):
549 """Background uploader thread."""
553 bufferblock = self._put_queue.get()
554 if bufferblock is None:
557 if self.copies is None:
558 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
560 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
561 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
563 with self._repacked_bb_lock:
564 # Check if this block was created by repacking smaller blocks
565 if bufferblock.blockid in self._repacked_bb:
566 # Update segment locators (with its tokens) of files within
568 old_loc = self._repacked_bb[bufferblock.blockid]['unsigned_loc']
569 for f in self._repacked_bb[bufferblock.blockid]['files']:
570 for s in [x for x in f._segments if x.locator == old_loc]:
572 del(self._repacked_bb[bufferblock.blockid])
573 except Exception as e:
574 bufferblock.set_state(_BufferBlock.ERROR, e)
576 if self._put_queue is not None:
577 self._put_queue.task_done()
579 def start_put_threads(self):
580 with self.threads_lock:
581 if self._put_threads is None:
582 # Start uploader threads.
584 # If we don't limit the Queue size, the upload queue can quickly
585 # grow to take up gigabytes of RAM if the writing process is
586 # generating data more quickly than it can be send to the Keep
589 # With two upload threads and a queue size of 2, this means up to 4
590 # blocks pending. If they are full 64 MiB blocks, that means up to
591 # 256 MiB of internal buffering, which is the same size as the
592 # default download block cache in KeepClient.
593 self._put_queue = queue.Queue(maxsize=2)
595 self._put_threads = []
596 for i in range(0, self.num_put_threads):
597 thread = threading.Thread(target=self._commit_bufferblock_worker)
598 self._put_threads.append(thread)
602 def _block_prefetch_worker(self):
603 """The background downloader thread."""
606 b = self._prefetch_queue.get()
611 _logger.exception("Exception doing block prefetch")
614 def start_get_threads(self):
615 if self._prefetch_threads is None:
616 self._prefetch_queue = queue.Queue()
617 self._prefetch_threads = []
618 for i in range(0, self.num_get_threads):
619 thread = threading.Thread(target=self._block_prefetch_worker)
620 self._prefetch_threads.append(thread)
626 def stop_threads(self):
627 """Shut down and wait for background upload and download threads to finish."""
629 if self._put_threads is not None:
630 for t in self._put_threads:
631 self._put_queue.put(None)
632 for t in self._put_threads:
634 self._put_threads = None
635 self._put_queue = None
637 if self._prefetch_threads is not None:
638 for t in self._prefetch_threads:
639 self._prefetch_queue.put(None)
640 for t in self._prefetch_threads:
642 self._prefetch_threads = None
643 self._prefetch_queue = None
648 def __exit__(self, exc_type, exc_value, traceback):
652 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
653 """Packs small blocks together before uploading"""
655 self._pending_write_size += closed_file_size
657 # Check if there are enough small blocks for filling up one in full
658 if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
661 # Search blocks ready for getting packed together before being
663 # A WRITABLE block always has an owner.
664 # A WRITABLE block with its owner.closed() implies that its
665 # size is <= KEEP_BLOCK_SIZE/2.
667 small_blocks = [b for b in listvalues(self._bufferblocks)
668 if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
669 except AttributeError:
670 # Writable blocks without owner shouldn't exist.
671 raise UnownedBlockError()
673 if len(small_blocks) <= 1:
674 # Not enough small blocks for repacking
677 for bb in small_blocks:
680 # Update the pending write size count with its true value, just in case
681 # some small file was opened, written and closed several times.
682 self._pending_write_size = sum([b.size() for b in small_blocks])
684 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
687 new_bb = self._alloc_bufferblock()
689 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
690 bb = small_blocks.pop(0)
691 self._pending_write_size -= bb.size()
692 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
693 files.append((bb, new_bb.write_pointer - bb.size()))
695 # If this repacked block will be committed asynchronously, take note
696 # of its files so their segments' locators will be updated with
697 # the correct permission token returned by the API server.
699 with self._repacked_bb_lock:
700 self._repacked_bb[new_bb.blockid] = {
701 'unsigned_loc': new_bb.locator(),
702 'files': [bb.owner for bb, _ in files],
705 self.commit_bufferblock(new_bb, sync=sync)
707 with self._repacked_bb_lock:
708 for bb, new_bb_segment_offset in files:
709 newsegs = bb.owner.segments()
711 if s.locator == bb.blockid:
712 s.locator = new_bb.locator()
713 s.segment_offset = new_bb_segment_offset+s.segment_offset
714 bb.owner.set_segments(newsegs)
715 self._delete_bufferblock(bb.blockid)
717 def commit_bufferblock(self, block, sync):
718 """Initiate a background upload of a bufferblock.
721 The block object to upload
724 If `sync` is True, upload the block synchronously.
725 If `sync` is False, upload the block asynchronously. This will
726 return immediately unless the upload queue is at capacity, in
727 which case it will wait on an upload queue slot.
731 # Mark the block as PENDING so to disallow any more appends.
732 block.set_state(_BufferBlock.PENDING)
733 except StateChangeError as e:
734 if e.state == _BufferBlock.PENDING:
736 block.wait_for_commit.wait()
739 if block.state() == _BufferBlock.COMMITTED:
741 elif block.state() == _BufferBlock.ERROR:
748 if self.copies is None:
749 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
751 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
752 block.set_state(_BufferBlock.COMMITTED, loc)
753 except Exception as e:
754 block.set_state(_BufferBlock.ERROR, e)
757 self.start_put_threads()
758 self._put_queue.put(block)
761 def get_bufferblock(self, locator):
762 return self._bufferblocks.get(locator)
765 def get_padding_block(self):
766 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
767 when using truncate() to extend the size of a file.
769 For reference (and possible future optimization), the md5sum of the
770 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
774 if self.padding_block is None:
775 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
776 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
777 self.commit_bufferblock(self.padding_block, False)
778 return self.padding_block
781 def delete_bufferblock(self, locator):
782 self._delete_bufferblock(locator)
784 def _delete_bufferblock(self, locator):
785 bb = self._bufferblocks[locator]
787 del self._bufferblocks[locator]
789 def get_block_contents(self, locator, num_retries, cache_only=False):
792 First checks to see if the locator is a BufferBlock and return that, if
793 not, passes the request through to KeepClient.get().
797 if locator in self._bufferblocks:
798 bufferblock = self._bufferblocks[locator]
799 if bufferblock.state() != _BufferBlock.COMMITTED:
800 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
802 locator = bufferblock._locator
804 return self._keep.get_from_cache(locator)
806 return self._keep.get(locator, num_retries=num_retries)
808 def commit_all(self):
809 """Commit all outstanding buffer blocks.
811 This is a synchronous call, and will not return until all buffer blocks
812 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
815 self.repack_small_blocks(force=True, sync=True)
818 items = listitems(self._bufferblocks)
821 if v.state() != _BufferBlock.COMMITTED and v.owner:
822 v.owner.flush(sync=False)
825 if self._put_queue is not None:
826 self._put_queue.join()
830 if v.state() == _BufferBlock.ERROR:
831 err.append((v.locator(), v.error))
833 raise KeepWriteError("Error writing some blocks", err, label="block")
836 # flush again with sync=True to remove committed bufferblocks from
839 v.owner.flush(sync=True)
841 def block_prefetch(self, locator):
842 """Initiate a background download of a block.
844 This assumes that the underlying KeepClient implements a block cache,
845 so repeated requests for the same block will not result in repeated
846 downloads (unless the block is evicted from the cache.) This method
851 if not self.prefetch_enabled:
854 if self._keep.get_from_cache(locator) is not None:
858 if locator in self._bufferblocks:
861 self.start_get_threads()
862 self._prefetch_queue.put(locator)
865 class ArvadosFile(object):
866 """Represent a file in a Collection.
868 ArvadosFile manages the underlying representation of a file in Keep as a
869 sequence of segments spanning a set of blocks, and implements random
872 This object may be accessed from multiple threads.
876 def __init__(self, parent, name, stream=[], segments=[]):
878 ArvadosFile constructor.
881 a list of Range objects representing a block stream
884 a list of Range objects representing segments
888 self._writers = set()
889 self._committed = False
891 self.lock = parent.root_collection().lock
893 self._add_segment(stream, s.locator, s.range_size)
894 self._current_bblock = None
897 return self.parent.writable()
900 def permission_expired(self, as_of_dt=None):
901 """Returns True if any of the segment's locators is expired"""
902 for r in self._segments:
903 if KeepLocator(r.locator).permission_expired(as_of_dt):
909 return copy.copy(self._segments)
912 def clone(self, new_parent, new_name):
913 """Make a copy of this file."""
914 cp = ArvadosFile(new_parent, new_name)
915 cp.replace_contents(self)
920 def replace_contents(self, other):
921 """Replace segments of this file with segments from another `ArvadosFile` object."""
925 for other_segment in other.segments():
926 new_loc = other_segment.locator
927 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
928 if other_segment.locator not in map_loc:
929 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
930 if bufferblock.state() != _BufferBlock.WRITABLE:
931 map_loc[other_segment.locator] = bufferblock.locator()
933 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
934 new_loc = map_loc[other_segment.locator]
936 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
938 self.set_committed(False)
940 def __eq__(self, other):
943 if not isinstance(other, ArvadosFile):
946 othersegs = other.segments()
948 if len(self._segments) != len(othersegs):
950 for i in range(0, len(othersegs)):
951 seg1 = self._segments[i]
956 if self.parent._my_block_manager().is_bufferblock(loc1):
957 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
959 if other.parent._my_block_manager().is_bufferblock(loc2):
960 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
962 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
963 seg1.range_start != seg2.range_start or
964 seg1.range_size != seg2.range_size or
965 seg1.segment_offset != seg2.segment_offset):
970 def __ne__(self, other):
971 return not self.__eq__(other)
974 def set_segments(self, segs):
975 self._segments = segs
978 def set_committed(self, value=True):
979 """Set committed flag.
981 If value is True, set committed to be True.
983 If value is False, set committed to be False for this and all parents.
985 if value == self._committed:
987 self._committed = value
988 if self._committed is False and self.parent is not None:
989 self.parent.set_committed(False)
993 """Get whether this is committed or not."""
994 return self._committed
997 def add_writer(self, writer):
998 """Add an ArvadosFileWriter reference to the list of writers"""
999 if isinstance(writer, ArvadosFileWriter):
1000 self._writers.add(writer)
1003 def remove_writer(self, writer, flush):
1005 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
1006 and do some block maintenance tasks.
1008 self._writers.remove(writer)
1010 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
1011 # File writer closed, not small enough for repacking
1014 # All writers closed and size is adequate for repacking
1015 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
1019 Get whether this is closed or not. When the writers list is empty, the file
1020 is supposed to be closed.
1022 return len(self._writers) == 0
1026 def truncate(self, size):
1027 """Shrink or expand the size of the file.
1029 If `size` is less than the size of the file, the file contents after
1030 `size` will be discarded. If `size` is greater than the current size
1031 of the file, it will be filled with zero bytes.
1034 if size < self.size():
1036 for r in self._segments:
1037 range_end = r.range_start+r.range_size
1038 if r.range_start >= size:
1039 # segment is past the trucate size, all done
1041 elif size < range_end:
1042 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1043 nr.segment_offset = r.segment_offset
1049 self._segments = new_segs
1050 self.set_committed(False)
1051 elif size > self.size():
1052 padding = self.parent._my_block_manager().get_padding_block()
1053 diff = size - self.size()
1054 while diff > config.KEEP_BLOCK_SIZE:
1055 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1056 diff -= config.KEEP_BLOCK_SIZE
1058 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1059 self.set_committed(False)
1061 # size == self.size()
1064 def readfrom(self, offset, size, num_retries, exact=False):
1065 """Read up to `size` bytes from the file starting at `offset`.
1068 If False (default), return less data than requested if the read
1069 crosses a block boundary and the next block isn't cached. If True,
1070 only return less data than requested when hitting EOF.
1074 if size == 0 or offset >= self.size():
1076 readsegs = locators_and_ranges(self._segments, offset, size)
1077 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
1082 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1084 blockview = memoryview(block)
1085 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1086 locs.add(lr.locator)
1091 if lr.locator not in locs:
1092 self.parent._my_block_manager().block_prefetch(lr.locator)
1093 locs.add(lr.locator)
1095 return b''.join(data)
1099 def writeto(self, offset, data, num_retries):
1100 """Write `data` to the file starting at `offset`.
1102 This will update existing bytes and/or extend the size of the file as
1106 if not isinstance(data, bytes) and not isinstance(data, memoryview):
1107 data = data.encode()
1111 if offset > self.size():
1112 self.truncate(offset)
1114 if len(data) > config.KEEP_BLOCK_SIZE:
1115 # Chunk it up into smaller writes
1117 dataview = memoryview(data)
1118 while n < len(data):
1119 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1120 n += config.KEEP_BLOCK_SIZE
1123 self.set_committed(False)
1125 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1126 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1128 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1129 self._current_bblock.repack_writes()
1130 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1131 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1132 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1134 self._current_bblock.append(data)
1136 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1138 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1143 def flush(self, sync=True, num_retries=0):
1144 """Flush the current bufferblock to Keep.
1147 If True, commit block synchronously, wait until buffer block has been written.
1148 If False, commit block asynchronously, return immediately after putting block into
1151 if self.committed():
1154 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1155 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1156 self._current_bblock.repack_writes()
1157 if self._current_bblock.state() != _BufferBlock.DELETED:
1158 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1162 for s in self._segments:
1163 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1165 if bb.state() != _BufferBlock.COMMITTED:
1166 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1167 to_delete.add(s.locator)
1168 s.locator = bb.locator()
1170 self.parent._my_block_manager().delete_bufferblock(s)
1172 self.parent.notify(MOD, self.parent, self.name, (self, self))
1176 def add_segment(self, blocks, pos, size):
1177 """Add a segment to the end of the file.
1179 `pos` and `offset` reference a section of the stream described by
1180 `blocks` (a list of Range objects)
1183 self._add_segment(blocks, pos, size)
1185 def _add_segment(self, blocks, pos, size):
1186 """Internal implementation of add_segment."""
1187 self.set_committed(False)
1188 for lr in locators_and_ranges(blocks, pos, size):
1189 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1190 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1191 self._segments.append(r)
1195 """Get the file size."""
1197 n = self._segments[-1]
1198 return n.range_start + n.range_size
1203 def manifest_text(self, stream_name=".", portable_locators=False,
1204 normalize=False, only_committed=False):
1207 for segment in self._segments:
1208 loc = segment.locator
1209 if self.parent._my_block_manager().is_bufferblock(loc):
1212 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1213 if portable_locators:
1214 loc = KeepLocator(loc).stripped()
1215 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1216 segment.segment_offset, segment.range_size))
1217 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1223 def _reparent(self, newparent, newname):
1224 self.set_committed(False)
1225 self.flush(sync=True)
1226 self.parent.remove(self.name)
1227 self.parent = newparent
1229 self.lock = self.parent.root_collection().lock
1232 class ArvadosFileReader(ArvadosFileReaderBase):
1233 """Wraps ArvadosFile in a file-like object supporting reading only.
1235 Be aware that this class is NOT thread safe as there is no locking around
1236 updating file pointer.
1240 def __init__(self, arvadosfile, mode="r", num_retries=None):
1241 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1242 self.arvadosfile = arvadosfile
1245 return self.arvadosfile.size()
1247 def stream_name(self):
1248 return self.arvadosfile.parent.stream_name()
1250 @_FileLikeObjectBase._before_close
1252 def read(self, size=None, num_retries=None):
1253 """Read up to `size` bytes from the file and return the result.
1255 Starts at the current file position. If `size` is None, read the
1256 entire remainder of the file.
1260 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1263 self._filepos += len(rd)
1264 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1265 return b''.join(data)
1267 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1268 self._filepos += len(data)
1271 @_FileLikeObjectBase._before_close
1273 def readfrom(self, offset, size, num_retries=None):
1274 """Read up to `size` bytes from the stream, starting at the specified file offset.
1276 This method does not change the file position.
1278 return self.arvadosfile.readfrom(offset, size, num_retries)
1284 class ArvadosFileWriter(ArvadosFileReader):
1285 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1287 Be aware that this class is NOT thread safe as there is no locking around
1288 updating file pointer.
1292 def __init__(self, arvadosfile, mode, num_retries=None):
1293 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1294 self.arvadosfile.add_writer(self)
1299 @_FileLikeObjectBase._before_close
1301 def write(self, data, num_retries=None):
1302 if self.mode[0] == "a":
1303 self._filepos = self.size()
1304 self.arvadosfile.writeto(self._filepos, data, num_retries)
1305 self._filepos += len(data)
1308 @_FileLikeObjectBase._before_close
1310 def writelines(self, seq, num_retries=None):
1312 self.write(s, num_retries=num_retries)
1314 @_FileLikeObjectBase._before_close
1315 def truncate(self, size=None):
1317 size = self._filepos
1318 self.arvadosfile.truncate(size)
1320 @_FileLikeObjectBase._before_close
1322 self.arvadosfile.flush()
1324 def close(self, flush=True):
1326 self.arvadosfile.remove_writer(self, flush)
1327 super(ArvadosFileWriter, self).close()