16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
25 _logger = logging.getLogger('arvados.arvfile')
28 """split(path) -> streamname, filename
30 Separate the stream name and file name in a /-separated stream path and
31 return a tuple (stream_name, file_name). If no stream name is available,
36 stream_name, file_name = path.rsplit('/', 1)
37 except ValueError: # No / in string
38 stream_name, file_name = '.', path
39 return stream_name, file_name
42 class UnownedBlockError(Exception):
43 """Raised when there's an writable block without an owner on the BlockManager."""
47 class _FileLikeObjectBase(object):
48 def __init__(self, name, mode):
54 def _before_close(orig_func):
55 @functools.wraps(orig_func)
56 def before_close_wrapper(self, *args, **kwargs):
58 raise ValueError("I/O operation on closed stream file")
59 return orig_func(self, *args, **kwargs)
60 return before_close_wrapper
65 def __exit__(self, exc_type, exc_value, traceback):
76 class ArvadosFileReaderBase(_FileLikeObjectBase):
77 def __init__(self, name, mode, num_retries=None):
78 super(ArvadosFileReaderBase, self).__init__(name, mode)
80 self.num_retries = num_retries
81 self._readline_cache = (None, None)
85 data = self.readline()
90 def decompressed_name(self):
91 return re.sub('\.(bz2|gz)$', '', self.name)
93 @_FileLikeObjectBase._before_close
94 def seek(self, pos, whence=os.SEEK_SET):
95 if whence == os.SEEK_CUR:
97 elif whence == os.SEEK_END:
100 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
116 @_FileLikeObjectBase._before_close
118 def readall(self, size=2**20, num_retries=None):
120 data = self.read(size, num_retries=num_retries)
125 @_FileLikeObjectBase._before_close
127 def readline(self, size=float('inf'), num_retries=None):
128 cache_pos, cache_data = self._readline_cache
129 if self.tell() == cache_pos:
131 self._filepos += len(cache_data)
134 data_size = len(data[-1])
135 while (data_size < size) and ('\n' not in data[-1]):
136 next_read = self.read(2 ** 20, num_retries=num_retries)
139 data.append(next_read)
140 data_size += len(next_read)
143 nextline_index = data.index('\n') + 1
145 nextline_index = len(data)
146 nextline_index = min(nextline_index, size)
147 self._filepos -= len(data) - nextline_index
148 self._readline_cache = (self.tell(), data[nextline_index:])
149 return data[:nextline_index]
151 @_FileLikeObjectBase._before_close
153 def decompress(self, decompress, size, num_retries=None):
154 for segment in self.readall(size, num_retries=num_retries):
155 data = decompress(segment)
159 @_FileLikeObjectBase._before_close
161 def readall_decompressed(self, size=2**20, num_retries=None):
163 if self.name.endswith('.bz2'):
164 dc = bz2.BZ2Decompressor()
165 return self.decompress(dc.decompress, size,
166 num_retries=num_retries)
167 elif self.name.endswith('.gz'):
168 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
169 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
170 size, num_retries=num_retries)
172 return self.readall(size, num_retries=num_retries)
174 @_FileLikeObjectBase._before_close
176 def readlines(self, sizehint=float('inf'), num_retries=None):
179 for s in self.readall(num_retries=num_retries):
182 if data_size >= sizehint:
184 return ''.join(data).splitlines(True)
187 raise IOError(errno.ENOSYS, "Not implemented")
189 def read(self, size, num_retries=None):
190 raise IOError(errno.ENOSYS, "Not implemented")
192 def readfrom(self, start, size, num_retries=None):
193 raise IOError(errno.ENOSYS, "Not implemented")
196 class StreamFileReader(ArvadosFileReaderBase):
197 class _NameAttribute(str):
198 # The Python file API provides a plain .name attribute.
199 # Older SDK provided a name() method.
200 # This class provides both, for maximum compatibility.
204 def __init__(self, stream, segments, name):
205 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
206 self._stream = stream
207 self.segments = segments
209 def stream_name(self):
210 return self._stream.name()
213 n = self.segments[-1]
214 return n.range_start + n.range_size
216 @_FileLikeObjectBase._before_close
218 def read(self, size, num_retries=None):
219 """Read up to 'size' bytes from the stream, starting at the current file position"""
224 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
226 lr = available_chunks[0]
227 data = self._stream.readfrom(lr.locator+lr.segment_offset,
229 num_retries=num_retries)
231 self._filepos += len(data)
234 @_FileLikeObjectBase._before_close
236 def readfrom(self, start, size, num_retries=None):
237 """Read up to 'size' bytes from the stream, starting at 'start'"""
242 for lr in locators_and_ranges(self.segments, start, size):
243 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
244 num_retries=num_retries))
247 def as_manifest(self):
249 for r in self.segments:
250 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
251 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
254 def synchronized(orig_func):
255 @functools.wraps(orig_func)
256 def synchronized_wrapper(self, *args, **kwargs):
258 return orig_func(self, *args, **kwargs)
259 return synchronized_wrapper
262 class StateChangeError(Exception):
263 def __init__(self, message, state, nextstate):
264 super(StateChangeError, self).__init__(message)
266 self.nextstate = nextstate
268 class _BufferBlock(object):
269 """A stand-in for a Keep block that is in the process of being written.
271 Writers can append to it, get the size, and compute the Keep locator.
272 There are three valid states:
278 Block is in the process of being uploaded to Keep, append is an error.
281 The block has been written to Keep, its internal buffer has been
282 released, fetching the block will fetch it via keep client (since we
283 discarded the internal copy), and identifiers referring to the BufferBlock
284 can be replaced with the block locator.
294 def __init__(self, blockid, starting_capacity, owner):
297 the identifier for this block
300 the initial buffer capacity
303 ArvadosFile that owns this block
306 self.blockid = blockid
307 self.buffer_block = bytearray(starting_capacity)
308 self.buffer_view = memoryview(self.buffer_block)
309 self.write_pointer = 0
310 self._state = _BufferBlock.WRITABLE
313 self.lock = threading.Lock()
314 self.wait_for_commit = threading.Event()
318 def append(self, data):
319 """Append some data to the buffer.
321 Only valid if the block is in WRITABLE state. Implements an expanding
322 buffer, doubling capacity as needed to accomdate all the data.
325 if self._state == _BufferBlock.WRITABLE:
326 while (self.write_pointer+len(data)) > len(self.buffer_block):
327 new_buffer_block = bytearray(len(self.buffer_block) * 2)
328 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
329 self.buffer_block = new_buffer_block
330 self.buffer_view = memoryview(self.buffer_block)
331 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
332 self.write_pointer += len(data)
335 raise AssertionError("Buffer block is not writable")
337 STATE_TRANSITIONS = frozenset([
339 (PENDING, COMMITTED),
344 def set_state(self, nextstate, val=None):
345 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
346 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
347 self._state = nextstate
349 if self._state == _BufferBlock.PENDING:
350 self.wait_for_commit.clear()
352 if self._state == _BufferBlock.COMMITTED:
354 self.buffer_view = None
355 self.buffer_block = None
356 self.wait_for_commit.set()
358 if self._state == _BufferBlock.ERROR:
360 self.wait_for_commit.set()
367 """The amount of data written to the buffer."""
368 return self.write_pointer
372 """The Keep locator for this buffer's contents."""
373 if self._locator is None:
374 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
378 def clone(self, new_blockid, owner):
379 if self._state == _BufferBlock.COMMITTED:
380 raise AssertionError("Cannot duplicate committed buffer block")
381 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
382 bufferblock.append(self.buffer_view[0:self.size()])
387 self._state = _BufferBlock.DELETED
389 self.buffer_block = None
390 self.buffer_view = None
393 class NoopLock(object):
397 def __exit__(self, exc_type, exc_value, traceback):
400 def acquire(self, blocking=False):
407 def must_be_writable(orig_func):
408 @functools.wraps(orig_func)
409 def must_be_writable_wrapper(self, *args, **kwargs):
410 if not self.writable():
411 raise IOError(errno.EROFS, "Collection is read-only.")
412 return orig_func(self, *args, **kwargs)
413 return must_be_writable_wrapper
416 class _BlockManager(object):
417 """BlockManager handles buffer blocks.
419 Also handles background block uploads, and background block prefetch for a
420 Collection of ArvadosFiles.
424 DEFAULT_PUT_THREADS = 2
425 DEFAULT_GET_THREADS = 2
427 def __init__(self, keep, copies=None, put_threads=None):
428 """keep: KeepClient object to use"""
430 self._bufferblocks = collections.OrderedDict()
431 self._put_queue = None
432 self._put_threads = None
433 self._prefetch_queue = None
434 self._prefetch_threads = None
435 self.lock = threading.Lock()
436 self.prefetch_enabled = True
438 self.num_put_threads = put_threads
440 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
441 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
443 self._pending_write_size = 0
444 self.threads_lock = threading.Lock()
445 self.padding_block = None
448 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
449 """Allocate a new, empty bufferblock in WRITABLE state and return it.
452 optional block identifier, otherwise one will be automatically assigned
455 optional capacity, otherwise will use default capacity
458 ArvadosFile that owns this block
461 return self._alloc_bufferblock(blockid, starting_capacity, owner)
463 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
465 blockid = str(uuid.uuid4())
466 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
467 self._bufferblocks[bufferblock.blockid] = bufferblock
471 def dup_block(self, block, owner):
472 """Create a new bufferblock initialized with the content of an existing bufferblock.
475 the buffer block to copy.
478 ArvadosFile that owns the new block
481 new_blockid = str(uuid.uuid4())
482 bufferblock = block.clone(new_blockid, owner)
483 self._bufferblocks[bufferblock.blockid] = bufferblock
487 def is_bufferblock(self, locator):
488 return locator in self._bufferblocks
490 def _commit_bufferblock_worker(self):
491 """Background uploader thread."""
495 bufferblock = self._put_queue.get()
496 if bufferblock is None:
499 if self.copies is None:
500 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
502 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
503 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
505 except Exception as e:
506 bufferblock.set_state(_BufferBlock.ERROR, e)
508 if self._put_queue is not None:
509 self._put_queue.task_done()
511 def start_put_threads(self):
512 with self.threads_lock:
513 if self._put_threads is None:
514 # Start uploader threads.
516 # If we don't limit the Queue size, the upload queue can quickly
517 # grow to take up gigabytes of RAM if the writing process is
518 # generating data more quickly than it can be send to the Keep
521 # With two upload threads and a queue size of 2, this means up to 4
522 # blocks pending. If they are full 64 MiB blocks, that means up to
523 # 256 MiB of internal buffering, which is the same size as the
524 # default download block cache in KeepClient.
525 self._put_queue = Queue.Queue(maxsize=2)
527 self._put_threads = []
528 for i in xrange(0, self.num_put_threads):
529 thread = threading.Thread(target=self._commit_bufferblock_worker)
530 self._put_threads.append(thread)
534 def _block_prefetch_worker(self):
535 """The background downloader thread."""
538 b = self._prefetch_queue.get()
543 _logger.exception("Exception doing block prefetch")
546 def start_get_threads(self):
547 if self._prefetch_threads is None:
548 self._prefetch_queue = Queue.Queue()
549 self._prefetch_threads = []
550 for i in xrange(0, self.num_get_threads):
551 thread = threading.Thread(target=self._block_prefetch_worker)
552 self._prefetch_threads.append(thread)
558 def stop_threads(self):
559 """Shut down and wait for background upload and download threads to finish."""
561 if self._put_threads is not None:
562 for t in self._put_threads:
563 self._put_queue.put(None)
564 for t in self._put_threads:
566 self._put_threads = None
567 self._put_queue = None
569 if self._prefetch_threads is not None:
570 for t in self._prefetch_threads:
571 self._prefetch_queue.put(None)
572 for t in self._prefetch_threads:
574 self._prefetch_threads = None
575 self._prefetch_queue = None
580 def __exit__(self, exc_type, exc_value, traceback):
584 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
585 """Packs small blocks together before uploading"""
586 self._pending_write_size += closed_file_size
588 # Check if there are enough small blocks for filling up one in full
589 if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
591 # Search blocks ready for getting packed together before being committed to Keep.
592 # A WRITABLE block always has an owner.
593 # A WRITABLE block with its owner.closed() implies that it's
594 # size is <= KEEP_BLOCK_SIZE/2.
596 small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
597 except AttributeError:
598 # Writable blocks without owner shouldn't exist.
599 raise UnownedBlockError()
601 if len(small_blocks) <= 1:
602 # Not enough small blocks for repacking
605 # Update the pending write size count with its true value, just in case
606 # some small file was opened, written and closed several times.
607 self._pending_write_size = sum([b.size() for b in small_blocks])
608 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
611 new_bb = self._alloc_bufferblock()
613 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
614 bb = small_blocks.pop(0)
616 self._pending_write_size -= bb.size()
617 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
618 files.append((bb, new_bb.write_pointer - bb.size()))
620 self.commit_bufferblock(new_bb, sync=sync)
624 bb.owner.set_segments([Range(new_bb.locator(), 0, bb.size(), fn[1])])
625 self._delete_bufferblock(bb.blockid)
627 def commit_bufferblock(self, block, sync):
628 """Initiate a background upload of a bufferblock.
631 The block object to upload
634 If `sync` is True, upload the block synchronously.
635 If `sync` is False, upload the block asynchronously. This will
636 return immediately unless the upload queue is at capacity, in
637 which case it will wait on an upload queue slot.
641 # Mark the block as PENDING so to disallow any more appends.
642 block.set_state(_BufferBlock.PENDING)
643 except StateChangeError as e:
644 if e.state == _BufferBlock.PENDING:
646 block.wait_for_commit.wait()
649 if block.state() == _BufferBlock.COMMITTED:
651 elif block.state() == _BufferBlock.ERROR:
658 if self.copies is None:
659 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
661 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
662 block.set_state(_BufferBlock.COMMITTED, loc)
663 except Exception as e:
664 block.set_state(_BufferBlock.ERROR, e)
667 self.start_put_threads()
668 self._put_queue.put(block)
671 def get_bufferblock(self, locator):
672 return self._bufferblocks.get(locator)
675 def get_padding_block(self):
676 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
677 when using truncate() to extend the size of a file.
679 For reference (and possible future optimization), the md5sum of the
680 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
684 if self.padding_block is None:
685 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
686 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
687 self.commit_bufferblock(self.padding_block, False)
688 return self.padding_block
691 def delete_bufferblock(self, locator):
692 self._delete_bufferblock(locator)
694 def _delete_bufferblock(self, locator):
695 bb = self._bufferblocks[locator]
697 del self._bufferblocks[locator]
699 def get_block_contents(self, locator, num_retries, cache_only=False):
702 First checks to see if the locator is a BufferBlock and return that, if
703 not, passes the request through to KeepClient.get().
707 if locator in self._bufferblocks:
708 bufferblock = self._bufferblocks[locator]
709 if bufferblock.state() != _BufferBlock.COMMITTED:
710 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
712 locator = bufferblock._locator
714 return self._keep.get_from_cache(locator)
716 return self._keep.get(locator, num_retries=num_retries)
718 def commit_all(self):
719 """Commit all outstanding buffer blocks.
721 This is a synchronous call, and will not return until all buffer blocks
722 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
725 self.repack_small_blocks(force=True, sync=True)
728 items = self._bufferblocks.items()
731 if v.state() != _BufferBlock.COMMITTED and v.owner:
732 v.owner.flush(sync=False)
735 if self._put_queue is not None:
736 self._put_queue.join()
740 if v.state() == _BufferBlock.ERROR:
741 err.append((v.locator(), v.error))
743 raise KeepWriteError("Error writing some blocks", err, label="block")
746 # flush again with sync=True to remove committed bufferblocks from
749 v.owner.flush(sync=True)
751 def block_prefetch(self, locator):
752 """Initiate a background download of a block.
754 This assumes that the underlying KeepClient implements a block cache,
755 so repeated requests for the same block will not result in repeated
756 downloads (unless the block is evicted from the cache.) This method
761 if not self.prefetch_enabled:
764 if self._keep.get_from_cache(locator) is not None:
768 if locator in self._bufferblocks:
771 self.start_get_threads()
772 self._prefetch_queue.put(locator)
775 class ArvadosFile(object):
776 """Represent a file in a Collection.
778 ArvadosFile manages the underlying representation of a file in Keep as a
779 sequence of segments spanning a set of blocks, and implements random
782 This object may be accessed from multiple threads.
786 def __init__(self, parent, name, stream=[], segments=[]):
788 ArvadosFile constructor.
791 a list of Range objects representing a block stream
794 a list of Range objects representing segments
798 self._writers = set()
799 self._committed = False
801 self.lock = parent.root_collection().lock
803 self._add_segment(stream, s.locator, s.range_size)
804 self._current_bblock = None
807 return self.parent.writable()
810 def permission_expired(self, as_of_dt=None):
811 """Returns True if any of the segment's locators is expired"""
812 for r in self._segments:
813 if KeepLocator(r.locator).permission_expired(as_of_dt):
819 return copy.copy(self._segments)
822 def clone(self, new_parent, new_name):
823 """Make a copy of this file."""
824 cp = ArvadosFile(new_parent, new_name)
825 cp.replace_contents(self)
830 def replace_contents(self, other):
831 """Replace segments of this file with segments from another `ArvadosFile` object."""
835 for other_segment in other.segments():
836 new_loc = other_segment.locator
837 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
838 if other_segment.locator not in map_loc:
839 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
840 if bufferblock.state() != _BufferBlock.WRITABLE:
841 map_loc[other_segment.locator] = bufferblock.locator()
843 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
844 new_loc = map_loc[other_segment.locator]
846 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
848 self.set_committed(False)
850 def __eq__(self, other):
853 if not isinstance(other, ArvadosFile):
856 othersegs = other.segments()
858 if len(self._segments) != len(othersegs):
860 for i in xrange(0, len(othersegs)):
861 seg1 = self._segments[i]
866 if self.parent._my_block_manager().is_bufferblock(loc1):
867 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
869 if other.parent._my_block_manager().is_bufferblock(loc2):
870 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
872 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
873 seg1.range_start != seg2.range_start or
874 seg1.range_size != seg2.range_size or
875 seg1.segment_offset != seg2.segment_offset):
880 def __ne__(self, other):
881 return not self.__eq__(other)
884 def set_segments(self, segs):
885 self._segments = segs
888 def set_committed(self, value=True):
889 """Set committed flag.
891 If value is True, set committed to be True.
893 If value is False, set committed to be False for this and all parents.
895 if value == self._committed:
897 self._committed = value
898 if self._committed is False and self.parent is not None:
899 self.parent.set_committed(False)
903 """Get whether this is committed or not."""
904 return self._committed
907 def add_writer(self, writer):
908 """Add an ArvadosFileWriter reference to the list of writers"""
909 if isinstance(writer, ArvadosFileWriter):
910 self._writers.add(writer)
913 def remove_writer(self, writer, flush):
915 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
916 and do some block maintenance tasks.
918 self._writers.remove(writer)
920 if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
921 # File writer closed, not small enough for repacking
924 # All writers closed and size is adequate for repacking
925 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
929 Get whether this is closed or not. When the writers list is empty, the file
930 is supposed to be closed.
932 return len(self._writers) == 0
936 def truncate(self, size):
937 """Shrink or expand the size of the file.
939 If `size` is less than the size of the file, the file contents after
940 `size` will be discarded. If `size` is greater than the current size
941 of the file, it will be filled with zero bytes.
944 if size < self.size():
946 for r in self._segments:
947 range_end = r.range_start+r.range_size
948 if r.range_start >= size:
949 # segment is past the trucate size, all done
951 elif size < range_end:
952 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
953 nr.segment_offset = r.segment_offset
959 self._segments = new_segs
960 self.set_committed(False)
961 elif size > self.size():
962 padding = self.parent._my_block_manager().get_padding_block()
963 diff = size - self.size()
964 while diff > config.KEEP_BLOCK_SIZE:
965 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
966 diff -= config.KEEP_BLOCK_SIZE
968 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
969 self.set_committed(False)
971 # size == self.size()
974 def readfrom(self, offset, size, num_retries, exact=False):
975 """Read up to `size` bytes from the file starting at `offset`.
978 If False (default), return less data than requested if the read
979 crosses a block boundary and the next block isn't cached. If True,
980 only return less data than requested when hitting EOF.
984 if size == 0 or offset >= self.size():
986 readsegs = locators_and_ranges(self._segments, offset, size)
987 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
992 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
994 blockview = memoryview(block)
995 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1001 if lr.locator not in locs:
1002 self.parent._my_block_manager().block_prefetch(lr.locator)
1003 locs.add(lr.locator)
1005 return ''.join(data)
1007 def _repack_writes(self, num_retries):
1008 """Optimize buffer block by repacking segments in file sequence.
1010 When the client makes random writes, they appear in the buffer block in
1011 the sequence they were written rather than the sequence they appear in
1012 the file. This makes for inefficient, fragmented manifests. Attempt
1013 to optimize by repacking writes in file sequence.
1016 segs = self._segments
1018 # Collect the segments that reference the buffer block.
1019 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
1021 # Collect total data referenced by segments (could be smaller than
1022 # bufferblock size if a portion of the file was written and
1023 # then overwritten).
1024 write_total = sum([s.range_size for s in bufferblock_segs])
1026 if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
1027 # If there's more than one segment referencing this block, it is
1028 # due to out-of-order writes and will produce a fragmented
1029 # manifest, so try to optimize by re-packing into a new buffer.
1030 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
1031 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
1032 for t in bufferblock_segs:
1033 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
1034 t.segment_offset = new_bb.size() - t.range_size
1036 self._current_bblock = new_bb
1040 def writeto(self, offset, data, num_retries):
1041 """Write `data` to the file starting at `offset`.
1043 This will update existing bytes and/or extend the size of the file as
1050 if offset > self.size():
1051 self.truncate(offset)
1053 if len(data) > config.KEEP_BLOCK_SIZE:
1054 # Chunk it up into smaller writes
1056 dataview = memoryview(data)
1057 while n < len(data):
1058 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1059 n += config.KEEP_BLOCK_SIZE
1062 self.set_committed(False)
1064 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1065 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1067 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1068 self._repack_writes(num_retries)
1069 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1070 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1071 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1073 self._current_bblock.append(data)
1075 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1077 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1082 def flush(self, sync=True, num_retries=0):
1083 """Flush the current bufferblock to Keep.
1086 If True, commit block synchronously, wait until buffer block has been written.
1087 If False, commit block asynchronously, return immediately after putting block into
1090 if self.committed():
1093 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1094 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1095 self._repack_writes(num_retries)
1096 if self._current_bblock.state() != _BufferBlock.DELETED:
1097 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1101 for s in self._segments:
1102 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1104 if bb.state() != _BufferBlock.COMMITTED:
1105 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1106 to_delete.add(s.locator)
1107 s.locator = bb.locator()
1109 self.parent._my_block_manager().delete_bufferblock(s)
1111 self.parent.notify(MOD, self.parent, self.name, (self, self))
1115 def add_segment(self, blocks, pos, size):
1116 """Add a segment to the end of the file.
1118 `pos` and `offset` reference a section of the stream described by
1119 `blocks` (a list of Range objects)
1122 self._add_segment(blocks, pos, size)
1124 def _add_segment(self, blocks, pos, size):
1125 """Internal implementation of add_segment."""
1126 self.set_committed(False)
1127 for lr in locators_and_ranges(blocks, pos, size):
1128 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1129 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1130 self._segments.append(r)
1134 """Get the file size."""
1136 n = self._segments[-1]
1137 return n.range_start + n.range_size
1142 def manifest_text(self, stream_name=".", portable_locators=False,
1143 normalize=False, only_committed=False):
1146 for segment in self.segments:
1147 loc = segment.locator
1148 if self.parent._my_block_manager().is_bufferblock(loc):
1151 loc = self._bufferblocks[loc].calculate_locator()
1152 if portable_locators:
1153 loc = KeepLocator(loc).stripped()
1154 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1155 segment.segment_offset, segment.range_size))
1156 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1162 def _reparent(self, newparent, newname):
1163 self.set_committed(False)
1164 self.flush(sync=True)
1165 self.parent.remove(self.name)
1166 self.parent = newparent
1168 self.lock = self.parent.root_collection().lock
1171 class ArvadosFileReader(ArvadosFileReaderBase):
1172 """Wraps ArvadosFile in a file-like object supporting reading only.
1174 Be aware that this class is NOT thread safe as there is no locking around
1175 updating file pointer.
1179 def __init__(self, arvadosfile, num_retries=None):
1180 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1181 self.arvadosfile = arvadosfile
1184 return self.arvadosfile.size()
1186 def stream_name(self):
1187 return self.arvadosfile.parent.stream_name()
1189 @_FileLikeObjectBase._before_close
1191 def read(self, size=None, num_retries=None):
1192 """Read up to `size` bytes from the file and return the result.
1194 Starts at the current file position. If `size` is None, read the
1195 entire remainder of the file.
1199 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1202 self._filepos += len(rd)
1203 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1204 return ''.join(data)
1206 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1207 self._filepos += len(data)
1210 @_FileLikeObjectBase._before_close
1212 def readfrom(self, offset, size, num_retries=None):
1213 """Read up to `size` bytes from the stream, starting at the specified file offset.
1215 This method does not change the file position.
1217 return self.arvadosfile.readfrom(offset, size, num_retries)
1223 class ArvadosFileWriter(ArvadosFileReader):
1224 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1226 Be aware that this class is NOT thread safe as there is no locking around
1227 updating file pointer.
1231 def __init__(self, arvadosfile, mode, num_retries=None):
1232 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1234 self.arvadosfile.add_writer(self)
1239 @_FileLikeObjectBase._before_close
1241 def write(self, data, num_retries=None):
1242 if self.mode[0] == "a":
1243 self.arvadosfile.writeto(self.size(), data, num_retries)
1245 self.arvadosfile.writeto(self._filepos, data, num_retries)
1246 self._filepos += len(data)
1249 @_FileLikeObjectBase._before_close
1251 def writelines(self, seq, num_retries=None):
1253 self.write(s, num_retries=num_retries)
1255 @_FileLikeObjectBase._before_close
1256 def truncate(self, size=None):
1258 size = self._filepos
1259 self.arvadosfile.truncate(size)
1261 @_FileLikeObjectBase._before_close
1263 self.arvadosfile.flush()
1265 def close(self, flush=True):
1267 self.arvadosfile.remove_writer(self, flush)
1268 super(ArvadosFileWriter, self).close()