16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
25 _logger = logging.getLogger('arvados.arvfile')
28 """split(path) -> streamname, filename
30 Separate the stream name and file name in a /-separated stream path and
31 return a tuple (stream_name, file_name). If no stream name is available,
36 stream_name, file_name = path.rsplit('/', 1)
37 except ValueError: # No / in string
38 stream_name, file_name = '.', path
39 return stream_name, file_name
42 class UnownedBlockError(Exception):
43 """Raised when there's an writable block without an owner on the BlockManager."""
47 class _FileLikeObjectBase(object):
48 def __init__(self, name, mode):
54 def _before_close(orig_func):
55 @functools.wraps(orig_func)
56 def before_close_wrapper(self, *args, **kwargs):
58 raise ValueError("I/O operation on closed stream file")
59 return orig_func(self, *args, **kwargs)
60 return before_close_wrapper
65 def __exit__(self, exc_type, exc_value, traceback):
76 class ArvadosFileReaderBase(_FileLikeObjectBase):
77 def __init__(self, name, mode, num_retries=None):
78 super(ArvadosFileReaderBase, self).__init__(name, mode)
80 self.num_retries = num_retries
81 self._readline_cache = (None, None)
85 data = self.readline()
90 def decompressed_name(self):
91 return re.sub('\.(bz2|gz)$', '', self.name)
93 @_FileLikeObjectBase._before_close
94 def seek(self, pos, whence=os.SEEK_SET):
95 if whence == os.SEEK_CUR:
97 elif whence == os.SEEK_END:
100 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
106 @_FileLikeObjectBase._before_close
108 def readall(self, size=2**20, num_retries=None):
110 data = self.read(size, num_retries=num_retries)
115 @_FileLikeObjectBase._before_close
117 def readline(self, size=float('inf'), num_retries=None):
118 cache_pos, cache_data = self._readline_cache
119 if self.tell() == cache_pos:
121 self._filepos += len(cache_data)
124 data_size = len(data[-1])
125 while (data_size < size) and ('\n' not in data[-1]):
126 next_read = self.read(2 ** 20, num_retries=num_retries)
129 data.append(next_read)
130 data_size += len(next_read)
133 nextline_index = data.index('\n') + 1
135 nextline_index = len(data)
136 nextline_index = min(nextline_index, size)
137 self._filepos -= len(data) - nextline_index
138 self._readline_cache = (self.tell(), data[nextline_index:])
139 return data[:nextline_index]
141 @_FileLikeObjectBase._before_close
143 def decompress(self, decompress, size, num_retries=None):
144 for segment in self.readall(size, num_retries=num_retries):
145 data = decompress(segment)
149 @_FileLikeObjectBase._before_close
151 def readall_decompressed(self, size=2**20, num_retries=None):
153 if self.name.endswith('.bz2'):
154 dc = bz2.BZ2Decompressor()
155 return self.decompress(dc.decompress, size,
156 num_retries=num_retries)
157 elif self.name.endswith('.gz'):
158 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
159 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
160 size, num_retries=num_retries)
162 return self.readall(size, num_retries=num_retries)
164 @_FileLikeObjectBase._before_close
166 def readlines(self, sizehint=float('inf'), num_retries=None):
169 for s in self.readall(num_retries=num_retries):
172 if data_size >= sizehint:
174 return ''.join(data).splitlines(True)
177 raise NotImplementedError()
179 def read(self, size, num_retries=None):
180 raise NotImplementedError()
182 def readfrom(self, start, size, num_retries=None):
183 raise NotImplementedError()
186 class StreamFileReader(ArvadosFileReaderBase):
187 class _NameAttribute(str):
188 # The Python file API provides a plain .name attribute.
189 # Older SDK provided a name() method.
190 # This class provides both, for maximum compatibility.
194 def __init__(self, stream, segments, name):
195 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
196 self._stream = stream
197 self.segments = segments
199 def stream_name(self):
200 return self._stream.name()
203 n = self.segments[-1]
204 return n.range_start + n.range_size
206 @_FileLikeObjectBase._before_close
208 def read(self, size, num_retries=None):
209 """Read up to 'size' bytes from the stream, starting at the current file position"""
214 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
216 lr = available_chunks[0]
217 data = self._stream.readfrom(lr.locator+lr.segment_offset,
219 num_retries=num_retries)
221 self._filepos += len(data)
224 @_FileLikeObjectBase._before_close
226 def readfrom(self, start, size, num_retries=None):
227 """Read up to 'size' bytes from the stream, starting at 'start'"""
232 for lr in locators_and_ranges(self.segments, start, size):
233 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
234 num_retries=num_retries))
237 def as_manifest(self):
239 for r in self.segments:
240 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
241 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
244 def synchronized(orig_func):
245 @functools.wraps(orig_func)
246 def synchronized_wrapper(self, *args, **kwargs):
248 return orig_func(self, *args, **kwargs)
249 return synchronized_wrapper
252 class StateChangeError(Exception):
253 def __init__(self, message, state, nextstate):
254 super(StateChangeError, self).__init__(message)
256 self.nextstate = nextstate
258 class _BufferBlock(object):
259 """A stand-in for a Keep block that is in the process of being written.
261 Writers can append to it, get the size, and compute the Keep locator.
262 There are three valid states:
268 Block is in the process of being uploaded to Keep, append is an error.
271 The block has been written to Keep, its internal buffer has been
272 released, fetching the block will fetch it via keep client (since we
273 discarded the internal copy), and identifiers referring to the BufferBlock
274 can be replaced with the block locator.
283 def __init__(self, blockid, starting_capacity, owner):
286 the identifier for this block
289 the initial buffer capacity
292 ArvadosFile that owns this block
295 self.blockid = blockid
296 self.buffer_block = bytearray(starting_capacity)
297 self.buffer_view = memoryview(self.buffer_block)
298 self.write_pointer = 0
299 self._state = _BufferBlock.WRITABLE
302 self.lock = threading.Lock()
303 self.wait_for_commit = threading.Event()
307 def append(self, data):
308 """Append some data to the buffer.
310 Only valid if the block is in WRITABLE state. Implements an expanding
311 buffer, doubling capacity as needed to accomdate all the data.
314 if self._state == _BufferBlock.WRITABLE:
315 while (self.write_pointer+len(data)) > len(self.buffer_block):
316 new_buffer_block = bytearray(len(self.buffer_block) * 2)
317 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
318 self.buffer_block = new_buffer_block
319 self.buffer_view = memoryview(self.buffer_block)
320 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
321 self.write_pointer += len(data)
324 raise AssertionError("Buffer block is not writable")
326 STATE_TRANSITIONS = frozenset([
328 (PENDING, COMMITTED),
333 def set_state(self, nextstate, val=None):
334 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
335 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
336 self._state = nextstate
338 if self._state == _BufferBlock.PENDING:
339 self.wait_for_commit.clear()
341 if self._state == _BufferBlock.COMMITTED:
343 self.buffer_view = None
344 self.buffer_block = None
345 self.wait_for_commit.set()
347 if self._state == _BufferBlock.ERROR:
349 self.wait_for_commit.set()
356 """The amount of data written to the buffer."""
357 return self.write_pointer
361 """The Keep locator for this buffer's contents."""
362 if self._locator is None:
363 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
367 def clone(self, new_blockid, owner):
368 if self._state == _BufferBlock.COMMITTED:
369 raise AssertionError("Cannot duplicate committed buffer block")
370 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
371 bufferblock.append(self.buffer_view[0:self.size()])
377 self.buffer_block = None
378 self.buffer_view = None
381 class NoopLock(object):
385 def __exit__(self, exc_type, exc_value, traceback):
388 def acquire(self, blocking=False):
395 def must_be_writable(orig_func):
396 @functools.wraps(orig_func)
397 def must_be_writable_wrapper(self, *args, **kwargs):
398 if not self.writable():
399 raise IOError(errno.EROFS, "Collection is read-only.")
400 return orig_func(self, *args, **kwargs)
401 return must_be_writable_wrapper
404 class _BlockManager(object):
405 """BlockManager handles buffer blocks.
407 Also handles background block uploads, and background block prefetch for a
408 Collection of ArvadosFiles.
412 DEFAULT_PUT_THREADS = 2
413 DEFAULT_GET_THREADS = 2
415 def __init__(self, keep, copies=None, put_threads=None):
416 """keep: KeepClient object to use"""
418 self._bufferblocks = collections.OrderedDict()
419 self._put_queue = None
420 self._put_threads = None
421 self._prefetch_queue = None
422 self._prefetch_threads = None
423 self.lock = threading.Lock()
424 self.prefetch_enabled = True
426 self.num_put_threads = put_threads
428 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
429 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
431 self._pending_write_size = 0
432 self.threads_lock = threading.Lock()
433 self.padding_block = None
436 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
437 """Allocate a new, empty bufferblock in WRITABLE state and return it.
440 optional block identifier, otherwise one will be automatically assigned
443 optional capacity, otherwise will use default capacity
446 ArvadosFile that owns this block
449 return self._alloc_bufferblock(blockid, starting_capacity, owner)
451 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
453 blockid = "%s" % uuid.uuid4()
454 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
455 self._bufferblocks[bufferblock.blockid] = bufferblock
459 def dup_block(self, block, owner):
460 """Create a new bufferblock initialized with the content of an existing bufferblock.
463 the buffer block to copy.
466 ArvadosFile that owns the new block
469 new_blockid = "bufferblock%i" % len(self._bufferblocks)
470 bufferblock = block.clone(new_blockid, owner)
471 self._bufferblocks[bufferblock.blockid] = bufferblock
475 def is_bufferblock(self, locator):
476 return locator in self._bufferblocks
478 def _commit_bufferblock_worker(self):
479 """Background uploader thread."""
483 bufferblock = self._put_queue.get()
484 if bufferblock is None:
487 if self.copies is None:
488 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
490 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
491 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
493 except Exception as e:
494 bufferblock.set_state(_BufferBlock.ERROR, e)
496 if self._put_queue is not None:
497 self._put_queue.task_done()
499 def start_put_threads(self):
500 with self.threads_lock:
501 if self._put_threads is None:
502 # Start uploader threads.
504 # If we don't limit the Queue size, the upload queue can quickly
505 # grow to take up gigabytes of RAM if the writing process is
506 # generating data more quickly than it can be send to the Keep
509 # With two upload threads and a queue size of 2, this means up to 4
510 # blocks pending. If they are full 64 MiB blocks, that means up to
511 # 256 MiB of internal buffering, which is the same size as the
512 # default download block cache in KeepClient.
513 self._put_queue = Queue.Queue(maxsize=2)
515 self._put_threads = []
516 for i in xrange(0, self.num_put_threads):
517 thread = threading.Thread(target=self._commit_bufferblock_worker)
518 self._put_threads.append(thread)
522 def _block_prefetch_worker(self):
523 """The background downloader thread."""
526 b = self._prefetch_queue.get()
531 _logger.exception("Exception doing block prefetch")
534 def start_get_threads(self):
535 if self._prefetch_threads is None:
536 self._prefetch_queue = Queue.Queue()
537 self._prefetch_threads = []
538 for i in xrange(0, self.num_get_threads):
539 thread = threading.Thread(target=self._block_prefetch_worker)
540 self._prefetch_threads.append(thread)
546 def stop_threads(self):
547 """Shut down and wait for background upload and download threads to finish."""
549 if self._put_threads is not None:
550 for t in self._put_threads:
551 self._put_queue.put(None)
552 for t in self._put_threads:
554 self._put_threads = None
555 self._put_queue = None
557 if self._prefetch_threads is not None:
558 for t in self._prefetch_threads:
559 self._prefetch_queue.put(None)
560 for t in self._prefetch_threads:
562 self._prefetch_threads = None
563 self._prefetch_queue = None
568 def __exit__(self, exc_type, exc_value, traceback):
572 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
573 """Packs small blocks together before uploading"""
574 self._pending_write_size += closed_file_size
576 # Check if there are enough small blocks for filling up one in full
577 if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
579 # Search blocks ready for getting packed together before being committed to Keep.
580 # A WRITABLE block always has an owner.
581 # A WRITABLE block with its owner.closed() implies that it's
582 # size is <= KEEP_BLOCK_SIZE/2.
584 small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
585 except AttributeError:
586 # Writable blocks without owner shouldn't exist.
587 raise UnownedBlockError()
589 if len(small_blocks) <= 1:
590 # Not enough small blocks for repacking
593 # Update the pending write size count with its true value, just in case
594 # some small file was opened, written and closed several times.
595 self._pending_write_size = sum([b.size() for b in small_blocks])
596 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
599 new_bb = self._alloc_bufferblock()
600 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
601 bb = small_blocks.pop(0)
603 self._pending_write_size -= bb.size()
604 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
605 arvfile.set_segments([Range(new_bb.blockid,
608 new_bb.write_pointer - bb.size())])
609 self._delete_bufferblock(bb.blockid)
610 self.commit_bufferblock(new_bb, sync=sync)
612 def commit_bufferblock(self, block, sync):
613 """Initiate a background upload of a bufferblock.
616 The block object to upload
619 If `sync` is True, upload the block synchronously.
620 If `sync` is False, upload the block asynchronously. This will
621 return immediately unless the upload queue is at capacity, in
622 which case it will wait on an upload queue slot.
626 # Mark the block as PENDING so to disallow any more appends.
627 block.set_state(_BufferBlock.PENDING)
628 except StateChangeError as e:
629 if e.state == _BufferBlock.PENDING:
631 block.wait_for_commit.wait()
634 if block.state() == _BufferBlock.COMMITTED:
636 elif block.state() == _BufferBlock.ERROR:
643 if self.copies is None:
644 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
646 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
647 block.set_state(_BufferBlock.COMMITTED, loc)
648 except Exception as e:
649 block.set_state(_BufferBlock.ERROR, e)
652 self.start_put_threads()
653 self._put_queue.put(block)
656 def get_bufferblock(self, locator):
657 return self._bufferblocks.get(locator)
660 def get_padding_block(self):
661 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
662 when using truncate() to extend the size of a file."""
664 if self.padding_block is None:
665 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
666 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
667 self.commit_bufferblock(self.padding_block, False)
668 return self.padding_block
671 def delete_bufferblock(self, locator):
672 self._delete_bufferblock(locator)
674 def _delete_bufferblock(self, locator):
675 bb = self._bufferblocks[locator]
677 del self._bufferblocks[locator]
679 def get_block_contents(self, locator, num_retries, cache_only=False):
682 First checks to see if the locator is a BufferBlock and return that, if
683 not, passes the request through to KeepClient.get().
687 if locator in self._bufferblocks:
688 bufferblock = self._bufferblocks[locator]
689 if bufferblock.state() != _BufferBlock.COMMITTED:
690 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
692 locator = bufferblock._locator
694 return self._keep.get_from_cache(locator)
696 return self._keep.get(locator, num_retries=num_retries)
698 def commit_all(self):
699 """Commit all outstanding buffer blocks.
701 This is a synchronous call, and will not return until all buffer blocks
702 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
705 self.repack_small_blocks(force=True, sync=True)
708 items = self._bufferblocks.items()
711 if v.state() != _BufferBlock.COMMITTED and v.owner:
712 v.owner.flush(sync=False)
715 if self._put_queue is not None:
716 self._put_queue.join()
720 if v.state() == _BufferBlock.ERROR:
721 err.append((v.locator(), v.error))
723 raise KeepWriteError("Error writing some blocks", err, label="block")
726 # flush again with sync=True to remove committed bufferblocks from
729 v.owner.flush(sync=True)
731 def block_prefetch(self, locator):
732 """Initiate a background download of a block.
734 This assumes that the underlying KeepClient implements a block cache,
735 so repeated requests for the same block will not result in repeated
736 downloads (unless the block is evicted from the cache.) This method
741 if not self.prefetch_enabled:
744 if self._keep.get_from_cache(locator) is not None:
748 if locator in self._bufferblocks:
751 self.start_get_threads()
752 self._prefetch_queue.put(locator)
755 class ArvadosFile(object):
756 """Represent a file in a Collection.
758 ArvadosFile manages the underlying representation of a file in Keep as a
759 sequence of segments spanning a set of blocks, and implements random
762 This object may be accessed from multiple threads.
766 def __init__(self, parent, name, stream=[], segments=[]):
768 ArvadosFile constructor.
771 a list of Range objects representing a block stream
774 a list of Range objects representing segments
778 self._writers = set()
779 self._committed = False
781 self.lock = parent.root_collection().lock
783 self._add_segment(stream, s.locator, s.range_size)
784 self._current_bblock = None
787 return self.parent.writable()
790 def permission_expired(self, as_of_dt=None):
791 """Returns True if any of the segment's locators is expired"""
792 for r in self._segments:
793 if KeepLocator(r.locator).permission_expired(as_of_dt):
799 return copy.copy(self._segments)
802 def clone(self, new_parent, new_name):
803 """Make a copy of this file."""
804 cp = ArvadosFile(new_parent, new_name)
805 cp.replace_contents(self)
810 def replace_contents(self, other):
811 """Replace segments of this file with segments from another `ArvadosFile` object."""
815 for other_segment in other.segments():
816 new_loc = other_segment.locator
817 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
818 if other_segment.locator not in map_loc:
819 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
820 if bufferblock.state() != _BufferBlock.WRITABLE:
821 map_loc[other_segment.locator] = bufferblock.locator()
823 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
824 new_loc = map_loc[other_segment.locator]
826 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
828 self.set_committed(False)
830 def __eq__(self, other):
833 if not isinstance(other, ArvadosFile):
836 othersegs = other.segments()
838 if len(self._segments) != len(othersegs):
840 for i in xrange(0, len(othersegs)):
841 seg1 = self._segments[i]
846 if self.parent._my_block_manager().is_bufferblock(loc1):
847 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
849 if other.parent._my_block_manager().is_bufferblock(loc2):
850 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
852 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
853 seg1.range_start != seg2.range_start or
854 seg1.range_size != seg2.range_size or
855 seg1.segment_offset != seg2.segment_offset):
860 def __ne__(self, other):
861 return not self.__eq__(other)
864 def set_segments(self, segs):
865 self._segments = segs
868 def set_committed(self, value=True):
869 """Set committed flag.
871 If value is True, set committed to be True.
873 If value is False, set committed to be False for this and all parents.
875 if value == self._committed:
877 self._committed = value
878 if self._committed is False and self.parent is not None:
879 self.parent.set_committed(False)
883 """Get whether this is committed or not."""
884 return self._committed
887 def add_writer(self, writer):
888 """Add an ArvadosFileWriter reference to the list of writers"""
889 if isinstance(writer, ArvadosFileWriter):
890 self._writers.add(writer)
893 def remove_writer(self, writer, flush):
895 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
896 and do some block maintenance tasks.
898 self._writers.remove(writer)
900 if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
901 # File writer closed, not small enough for repacking
904 # All writers closed and size is adequate for repacking
905 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
909 Get whether this is closed or not. When the writers list is empty, the file
910 is supposed to be closed.
912 return len(self._writers) == 0
916 def truncate(self, size):
917 """Shrink or expand the size of the file.
919 If `size` is less than the size of the file, the file contents after
920 `size` will be discarded. If `size` is greater than the current size
921 of the file, it will be filled with zero bytes.
924 if size < self.size():
926 for r in self._segments:
927 range_end = r.range_start+r.range_size
928 if r.range_start >= size:
929 # segment is past the trucate size, all done
931 elif size < range_end:
932 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
933 nr.segment_offset = r.segment_offset
939 self._segments = new_segs
940 self.set_committed(False)
941 elif size > self.size():
942 padding = self.parent._my_block_manager().get_padding_block()
943 diff = size - self.size()
944 while diff > config.KEEP_BLOCK_SIZE:
945 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
946 diff -= config.KEEP_BLOCK_SIZE
948 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
949 self.set_committed(False)
951 # size == self.size()
954 def readfrom(self, offset, size, num_retries, exact=False):
955 """Read up to `size` bytes from the file starting at `offset`.
958 If False (default), return less data than requested if the read
959 crosses a block boundary and the next block isn't cached. If True,
960 only return less data than requested when hitting EOF.
964 if size == 0 or offset >= self.size():
966 readsegs = locators_and_ranges(self._segments, offset, size)
967 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
972 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
974 blockview = memoryview(block)
975 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
981 if lr.locator not in locs:
982 self.parent._my_block_manager().block_prefetch(lr.locator)
987 def _repack_writes(self, num_retries):
988 """Optimize buffer block by repacking segments in file sequence.
990 When the client makes random writes, they appear in the buffer block in
991 the sequence they were written rather than the sequence they appear in
992 the file. This makes for inefficient, fragmented manifests. Attempt
993 to optimize by repacking writes in file sequence.
996 segs = self._segments
998 # Collect the segments that reference the buffer block.
999 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
1001 if len(bufferblock_segs) > 1:
1002 # Collect total data referenced by segments (could be smaller than
1003 # bufferblock size if a portion of the file was written and
1004 # then overwritten).
1005 write_total = sum([s.range_size for s in bufferblock_segs])
1007 # If there's more than one segment referencing this block, it is
1008 # due to out-of-order writes and will produce a fragmented
1009 # manifest, so try to optimize by re-packing into a new buffer.
1010 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
1011 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
1012 for t in bufferblock_segs:
1013 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
1014 t.segment_offset = new_bb.size() - t.range_size
1016 self._current_bblock = new_bb
1020 def writeto(self, offset, data, num_retries):
1021 """Write `data` to the file starting at `offset`.
1023 This will update existing bytes and/or extend the size of the file as
1030 if offset > self.size():
1031 self.truncate(offset)
1033 if len(data) > config.KEEP_BLOCK_SIZE:
1034 # Chunk it up into smaller writes
1036 dataview = memoryview(data)
1037 while n < len(data):
1038 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1039 n += config.KEEP_BLOCK_SIZE
1042 self.set_committed(False)
1044 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1045 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1047 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1048 self._repack_writes(num_retries)
1049 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1050 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1051 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1053 self._current_bblock.append(data)
1055 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1057 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1062 def flush(self, sync=True, num_retries=0):
1063 """Flush the current bufferblock to Keep.
1066 If True, commit block synchronously, wait until buffer block has been written.
1067 If False, commit block asynchronously, return immediately after putting block into
1070 if self.committed():
1073 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1074 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1075 self._repack_writes(num_retries)
1076 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1080 for s in self._segments:
1081 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1083 if bb.state() != _BufferBlock.COMMITTED:
1084 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1085 to_delete.add(s.locator)
1086 s.locator = bb.locator()
1088 self.parent._my_block_manager().delete_bufferblock(s)
1090 self.parent.notify(MOD, self.parent, self.name, (self, self))
1094 def add_segment(self, blocks, pos, size):
1095 """Add a segment to the end of the file.
1097 `pos` and `offset` reference a section of the stream described by
1098 `blocks` (a list of Range objects)
1101 self._add_segment(blocks, pos, size)
1103 def _add_segment(self, blocks, pos, size):
1104 """Internal implementation of add_segment."""
1105 self.set_committed(False)
1106 for lr in locators_and_ranges(blocks, pos, size):
1107 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1108 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1109 self._segments.append(r)
1113 """Get the file size."""
1115 n = self._segments[-1]
1116 return n.range_start + n.range_size
1121 def manifest_text(self, stream_name=".", portable_locators=False,
1122 normalize=False, only_committed=False):
1125 for segment in self.segments:
1126 loc = segment.locator
1127 if self.parent._my_block_manager().is_bufferblock(loc):
1130 loc = self._bufferblocks[loc].calculate_locator()
1131 if portable_locators:
1132 loc = KeepLocator(loc).stripped()
1133 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1134 segment.segment_offset, segment.range_size))
1135 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1141 def _reparent(self, newparent, newname):
1142 self.set_committed(False)
1143 self.flush(sync=True)
1144 self.parent.remove(self.name)
1145 self.parent = newparent
1147 self.lock = self.parent.root_collection().lock
1150 class ArvadosFileReader(ArvadosFileReaderBase):
1151 """Wraps ArvadosFile in a file-like object supporting reading only.
1153 Be aware that this class is NOT thread safe as there is no locking around
1154 updating file pointer.
1158 def __init__(self, arvadosfile, num_retries=None):
1159 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1160 self.arvadosfile = arvadosfile
1163 return self.arvadosfile.size()
1165 def stream_name(self):
1166 return self.arvadosfile.parent.stream_name()
1168 @_FileLikeObjectBase._before_close
1170 def read(self, size=None, num_retries=None):
1171 """Read up to `size` bytes from the file and return the result.
1173 Starts at the current file position. If `size` is None, read the
1174 entire remainder of the file.
1178 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1181 self._filepos += len(rd)
1182 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1183 return ''.join(data)
1185 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1186 self._filepos += len(data)
1189 @_FileLikeObjectBase._before_close
1191 def readfrom(self, offset, size, num_retries=None):
1192 """Read up to `size` bytes from the stream, starting at the specified file offset.
1194 This method does not change the file position.
1196 return self.arvadosfile.readfrom(offset, size, num_retries)
1202 class ArvadosFileWriter(ArvadosFileReader):
1203 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1205 Be aware that this class is NOT thread safe as there is no locking around
1206 updating file pointer.
1210 def __init__(self, arvadosfile, mode, num_retries=None):
1211 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1213 self.arvadosfile.add_writer(self)
1215 @_FileLikeObjectBase._before_close
1217 def write(self, data, num_retries=None):
1218 if self.mode[0] == "a":
1219 self.arvadosfile.writeto(self.size(), data, num_retries)
1221 self.arvadosfile.writeto(self._filepos, data, num_retries)
1222 self._filepos += len(data)
1225 @_FileLikeObjectBase._before_close
1227 def writelines(self, seq, num_retries=None):
1229 self.write(s, num_retries=num_retries)
1231 @_FileLikeObjectBase._before_close
1232 def truncate(self, size=None):
1234 size = self._filepos
1235 self.arvadosfile.truncate(size)
1236 if self._filepos > self.size():
1237 self._filepos = self.size()
1239 @_FileLikeObjectBase._before_close
1241 self.arvadosfile.flush()
1243 def close(self, flush=True):
1245 self.arvadosfile.remove_writer(self, flush)
1246 super(ArvadosFileWriter, self).close()