16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
25 _logger = logging.getLogger('arvados.arvfile')
28 """split(path) -> streamname, filename
30 Separate the stream name and file name in a /-separated stream path and
31 return a tuple (stream_name, file_name). If no stream name is available,
36 stream_name, file_name = path.rsplit('/', 1)
37 except ValueError: # No / in string
38 stream_name, file_name = '.', path
39 return stream_name, file_name
42 class UnownedBlockError(Exception):
43 """Raised when there's an writable block without an owner on the BlockManager."""
47 class _FileLikeObjectBase(object):
48 def __init__(self, name, mode):
54 def _before_close(orig_func):
55 @functools.wraps(orig_func)
56 def before_close_wrapper(self, *args, **kwargs):
58 raise ValueError("I/O operation on closed stream file")
59 return orig_func(self, *args, **kwargs)
60 return before_close_wrapper
65 def __exit__(self, exc_type, exc_value, traceback):
76 class ArvadosFileReaderBase(_FileLikeObjectBase):
77 def __init__(self, name, mode, num_retries=None):
78 super(ArvadosFileReaderBase, self).__init__(name, mode)
80 self.num_retries = num_retries
81 self._readline_cache = (None, None)
85 data = self.readline()
90 def decompressed_name(self):
91 return re.sub('\.(bz2|gz)$', '', self.name)
93 @_FileLikeObjectBase._before_close
94 def seek(self, pos, whence=os.SEEK_SET):
95 if whence == os.SEEK_CUR:
97 elif whence == os.SEEK_END:
100 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
116 @_FileLikeObjectBase._before_close
118 def readall(self, size=2**20, num_retries=None):
120 data = self.read(size, num_retries=num_retries)
125 @_FileLikeObjectBase._before_close
127 def readline(self, size=float('inf'), num_retries=None):
128 cache_pos, cache_data = self._readline_cache
129 if self.tell() == cache_pos:
131 self._filepos += len(cache_data)
134 data_size = len(data[-1])
135 while (data_size < size) and ('\n' not in data[-1]):
136 next_read = self.read(2 ** 20, num_retries=num_retries)
139 data.append(next_read)
140 data_size += len(next_read)
143 nextline_index = data.index('\n') + 1
145 nextline_index = len(data)
146 nextline_index = min(nextline_index, size)
147 self._filepos -= len(data) - nextline_index
148 self._readline_cache = (self.tell(), data[nextline_index:])
149 return data[:nextline_index]
151 @_FileLikeObjectBase._before_close
153 def decompress(self, decompress, size, num_retries=None):
154 for segment in self.readall(size, num_retries=num_retries):
155 data = decompress(segment)
159 @_FileLikeObjectBase._before_close
161 def readall_decompressed(self, size=2**20, num_retries=None):
163 if self.name.endswith('.bz2'):
164 dc = bz2.BZ2Decompressor()
165 return self.decompress(dc.decompress, size,
166 num_retries=num_retries)
167 elif self.name.endswith('.gz'):
168 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
169 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
170 size, num_retries=num_retries)
172 return self.readall(size, num_retries=num_retries)
174 @_FileLikeObjectBase._before_close
176 def readlines(self, sizehint=float('inf'), num_retries=None):
179 for s in self.readall(num_retries=num_retries):
182 if data_size >= sizehint:
184 return ''.join(data).splitlines(True)
187 raise IOError(errno.ENOSYS, "Not implemented")
189 def read(self, size, num_retries=None):
190 raise IOError(errno.ENOSYS, "Not implemented")
192 def readfrom(self, start, size, num_retries=None):
193 raise IOError(errno.ENOSYS, "Not implemented")
196 class StreamFileReader(ArvadosFileReaderBase):
197 class _NameAttribute(str):
198 # The Python file API provides a plain .name attribute.
199 # Older SDK provided a name() method.
200 # This class provides both, for maximum compatibility.
204 def __init__(self, stream, segments, name):
205 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
206 self._stream = stream
207 self.segments = segments
209 def stream_name(self):
210 return self._stream.name()
213 n = self.segments[-1]
214 return n.range_start + n.range_size
216 @_FileLikeObjectBase._before_close
218 def read(self, size, num_retries=None):
219 """Read up to 'size' bytes from the stream, starting at the current file position"""
224 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
226 lr = available_chunks[0]
227 data = self._stream.readfrom(lr.locator+lr.segment_offset,
229 num_retries=num_retries)
231 self._filepos += len(data)
234 @_FileLikeObjectBase._before_close
236 def readfrom(self, start, size, num_retries=None):
237 """Read up to 'size' bytes from the stream, starting at 'start'"""
242 for lr in locators_and_ranges(self.segments, start, size):
243 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
244 num_retries=num_retries))
247 def as_manifest(self):
249 for r in self.segments:
250 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
251 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
254 def synchronized(orig_func):
255 @functools.wraps(orig_func)
256 def synchronized_wrapper(self, *args, **kwargs):
258 return orig_func(self, *args, **kwargs)
259 return synchronized_wrapper
262 class StateChangeError(Exception):
263 def __init__(self, message, state, nextstate):
264 super(StateChangeError, self).__init__(message)
266 self.nextstate = nextstate
268 class _BufferBlock(object):
269 """A stand-in for a Keep block that is in the process of being written.
271 Writers can append to it, get the size, and compute the Keep locator.
272 There are three valid states:
278 Block is in the process of being uploaded to Keep, append is an error.
281 The block has been written to Keep, its internal buffer has been
282 released, fetching the block will fetch it via keep client (since we
283 discarded the internal copy), and identifiers referring to the BufferBlock
284 can be replaced with the block locator.
293 def __init__(self, blockid, starting_capacity, owner):
296 the identifier for this block
299 the initial buffer capacity
302 ArvadosFile that owns this block
305 self.blockid = blockid
306 self.buffer_block = bytearray(starting_capacity)
307 self.buffer_view = memoryview(self.buffer_block)
308 self.write_pointer = 0
309 self._state = _BufferBlock.WRITABLE
312 self.lock = threading.Lock()
313 self.wait_for_commit = threading.Event()
317 def append(self, data):
318 """Append some data to the buffer.
320 Only valid if the block is in WRITABLE state. Implements an expanding
321 buffer, doubling capacity as needed to accomdate all the data.
324 if self._state == _BufferBlock.WRITABLE:
325 while (self.write_pointer+len(data)) > len(self.buffer_block):
326 new_buffer_block = bytearray(len(self.buffer_block) * 2)
327 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
328 self.buffer_block = new_buffer_block
329 self.buffer_view = memoryview(self.buffer_block)
330 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
331 self.write_pointer += len(data)
334 raise AssertionError("Buffer block is not writable")
336 STATE_TRANSITIONS = frozenset([
338 (PENDING, COMMITTED),
343 def set_state(self, nextstate, val=None):
344 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
345 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
346 self._state = nextstate
348 if self._state == _BufferBlock.PENDING:
349 self.wait_for_commit.clear()
351 if self._state == _BufferBlock.COMMITTED:
353 self.buffer_view = None
354 self.buffer_block = None
355 self.wait_for_commit.set()
357 if self._state == _BufferBlock.ERROR:
359 self.wait_for_commit.set()
366 """The amount of data written to the buffer."""
367 return self.write_pointer
371 """The Keep locator for this buffer's contents."""
372 if self._locator is None:
373 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
377 def clone(self, new_blockid, owner):
378 if self._state == _BufferBlock.COMMITTED:
379 raise AssertionError("Cannot duplicate committed buffer block")
380 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
381 bufferblock.append(self.buffer_view[0:self.size()])
387 self.buffer_block = None
388 self.buffer_view = None
391 class NoopLock(object):
395 def __exit__(self, exc_type, exc_value, traceback):
398 def acquire(self, blocking=False):
405 def must_be_writable(orig_func):
406 @functools.wraps(orig_func)
407 def must_be_writable_wrapper(self, *args, **kwargs):
408 if not self.writable():
409 raise IOError(errno.EROFS, "Collection is read-only.")
410 return orig_func(self, *args, **kwargs)
411 return must_be_writable_wrapper
414 class _BlockManager(object):
415 """BlockManager handles buffer blocks.
417 Also handles background block uploads, and background block prefetch for a
418 Collection of ArvadosFiles.
422 DEFAULT_PUT_THREADS = 2
423 DEFAULT_GET_THREADS = 2
425 def __init__(self, keep, copies=None, put_threads=None):
426 """keep: KeepClient object to use"""
428 self._bufferblocks = collections.OrderedDict()
429 self._put_queue = None
430 self._put_threads = None
431 self._prefetch_queue = None
432 self._prefetch_threads = None
433 self.lock = threading.Lock()
434 self.prefetch_enabled = True
436 self.num_put_threads = put_threads
438 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
439 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
441 self._pending_write_size = 0
442 self.threads_lock = threading.Lock()
443 self.padding_block = None
446 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
447 """Allocate a new, empty bufferblock in WRITABLE state and return it.
450 optional block identifier, otherwise one will be automatically assigned
453 optional capacity, otherwise will use default capacity
456 ArvadosFile that owns this block
459 return self._alloc_bufferblock(blockid, starting_capacity, owner)
461 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
463 blockid = "%s" % uuid.uuid4()
464 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
465 self._bufferblocks[bufferblock.blockid] = bufferblock
469 def dup_block(self, block, owner):
470 """Create a new bufferblock initialized with the content of an existing bufferblock.
473 the buffer block to copy.
476 ArvadosFile that owns the new block
479 new_blockid = "bufferblock%i" % len(self._bufferblocks)
480 bufferblock = block.clone(new_blockid, owner)
481 self._bufferblocks[bufferblock.blockid] = bufferblock
485 def is_bufferblock(self, locator):
486 return locator in self._bufferblocks
488 def _commit_bufferblock_worker(self):
489 """Background uploader thread."""
493 bufferblock = self._put_queue.get()
494 if bufferblock is None:
497 if self.copies is None:
498 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
500 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
501 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
503 except Exception as e:
504 bufferblock.set_state(_BufferBlock.ERROR, e)
506 if self._put_queue is not None:
507 self._put_queue.task_done()
509 def start_put_threads(self):
510 with self.threads_lock:
511 if self._put_threads is None:
512 # Start uploader threads.
514 # If we don't limit the Queue size, the upload queue can quickly
515 # grow to take up gigabytes of RAM if the writing process is
516 # generating data more quickly than it can be send to the Keep
519 # With two upload threads and a queue size of 2, this means up to 4
520 # blocks pending. If they are full 64 MiB blocks, that means up to
521 # 256 MiB of internal buffering, which is the same size as the
522 # default download block cache in KeepClient.
523 self._put_queue = Queue.Queue(maxsize=2)
525 self._put_threads = []
526 for i in xrange(0, self.num_put_threads):
527 thread = threading.Thread(target=self._commit_bufferblock_worker)
528 self._put_threads.append(thread)
532 def _block_prefetch_worker(self):
533 """The background downloader thread."""
536 b = self._prefetch_queue.get()
541 _logger.exception("Exception doing block prefetch")
544 def start_get_threads(self):
545 if self._prefetch_threads is None:
546 self._prefetch_queue = Queue.Queue()
547 self._prefetch_threads = []
548 for i in xrange(0, self.num_get_threads):
549 thread = threading.Thread(target=self._block_prefetch_worker)
550 self._prefetch_threads.append(thread)
556 def stop_threads(self):
557 """Shut down and wait for background upload and download threads to finish."""
559 if self._put_threads is not None:
560 for t in self._put_threads:
561 self._put_queue.put(None)
562 for t in self._put_threads:
564 self._put_threads = None
565 self._put_queue = None
567 if self._prefetch_threads is not None:
568 for t in self._prefetch_threads:
569 self._prefetch_queue.put(None)
570 for t in self._prefetch_threads:
572 self._prefetch_threads = None
573 self._prefetch_queue = None
578 def __exit__(self, exc_type, exc_value, traceback):
582 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
583 """Packs small blocks together before uploading"""
584 self._pending_write_size += closed_file_size
586 # Check if there are enough small blocks for filling up one in full
587 if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
589 # Search blocks ready for getting packed together before being committed to Keep.
590 # A WRITABLE block always has an owner.
591 # A WRITABLE block with its owner.closed() implies that it's
592 # size is <= KEEP_BLOCK_SIZE/2.
594 small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
595 except AttributeError:
596 # Writable blocks without owner shouldn't exist.
597 raise UnownedBlockError()
599 if len(small_blocks) <= 1:
600 # Not enough small blocks for repacking
603 # Update the pending write size count with its true value, just in case
604 # some small file was opened, written and closed several times.
605 self._pending_write_size = sum([b.size() for b in small_blocks])
606 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
609 new_bb = self._alloc_bufferblock()
610 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
611 bb = small_blocks.pop(0)
613 self._pending_write_size -= bb.size()
614 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
615 arvfile.set_segments([Range(new_bb.blockid,
618 new_bb.write_pointer - bb.size())])
619 self._delete_bufferblock(bb.blockid)
620 self.commit_bufferblock(new_bb, sync=sync)
622 def commit_bufferblock(self, block, sync):
623 """Initiate a background upload of a bufferblock.
626 The block object to upload
629 If `sync` is True, upload the block synchronously.
630 If `sync` is False, upload the block asynchronously. This will
631 return immediately unless the upload queue is at capacity, in
632 which case it will wait on an upload queue slot.
636 # Mark the block as PENDING so to disallow any more appends.
637 block.set_state(_BufferBlock.PENDING)
638 except StateChangeError as e:
639 if e.state == _BufferBlock.PENDING:
641 block.wait_for_commit.wait()
644 if block.state() == _BufferBlock.COMMITTED:
646 elif block.state() == _BufferBlock.ERROR:
653 if self.copies is None:
654 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
656 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
657 block.set_state(_BufferBlock.COMMITTED, loc)
658 except Exception as e:
659 block.set_state(_BufferBlock.ERROR, e)
662 self.start_put_threads()
663 self._put_queue.put(block)
666 def get_bufferblock(self, locator):
667 return self._bufferblocks.get(locator)
670 def get_padding_block(self):
671 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
672 when using truncate() to extend the size of a file.
674 For reference (and possible future optimization), the md5sum of the
675 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
679 if self.padding_block is None:
680 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
681 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
682 self.commit_bufferblock(self.padding_block, False)
683 return self.padding_block
686 def delete_bufferblock(self, locator):
687 self._delete_bufferblock(locator)
689 def _delete_bufferblock(self, locator):
690 bb = self._bufferblocks[locator]
692 del self._bufferblocks[locator]
694 def get_block_contents(self, locator, num_retries, cache_only=False):
697 First checks to see if the locator is a BufferBlock and return that, if
698 not, passes the request through to KeepClient.get().
702 if locator in self._bufferblocks:
703 bufferblock = self._bufferblocks[locator]
704 if bufferblock.state() != _BufferBlock.COMMITTED:
705 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
707 locator = bufferblock._locator
709 return self._keep.get_from_cache(locator)
711 return self._keep.get(locator, num_retries=num_retries)
713 def commit_all(self):
714 """Commit all outstanding buffer blocks.
716 This is a synchronous call, and will not return until all buffer blocks
717 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
720 self.repack_small_blocks(force=True, sync=True)
723 items = self._bufferblocks.items()
726 if v.state() != _BufferBlock.COMMITTED and v.owner:
727 v.owner.flush(sync=False)
730 if self._put_queue is not None:
731 self._put_queue.join()
735 if v.state() == _BufferBlock.ERROR:
736 err.append((v.locator(), v.error))
738 raise KeepWriteError("Error writing some blocks", err, label="block")
741 # flush again with sync=True to remove committed bufferblocks from
744 v.owner.flush(sync=True)
746 def block_prefetch(self, locator):
747 """Initiate a background download of a block.
749 This assumes that the underlying KeepClient implements a block cache,
750 so repeated requests for the same block will not result in repeated
751 downloads (unless the block is evicted from the cache.) This method
756 if not self.prefetch_enabled:
759 if self._keep.get_from_cache(locator) is not None:
763 if locator in self._bufferblocks:
766 self.start_get_threads()
767 self._prefetch_queue.put(locator)
770 class ArvadosFile(object):
771 """Represent a file in a Collection.
773 ArvadosFile manages the underlying representation of a file in Keep as a
774 sequence of segments spanning a set of blocks, and implements random
777 This object may be accessed from multiple threads.
781 def __init__(self, parent, name, stream=[], segments=[]):
783 ArvadosFile constructor.
786 a list of Range objects representing a block stream
789 a list of Range objects representing segments
793 self._writers = set()
794 self._committed = False
796 self.lock = parent.root_collection().lock
798 self._add_segment(stream, s.locator, s.range_size)
799 self._current_bblock = None
802 return self.parent.writable()
805 def permission_expired(self, as_of_dt=None):
806 """Returns True if any of the segment's locators is expired"""
807 for r in self._segments:
808 if KeepLocator(r.locator).permission_expired(as_of_dt):
814 return copy.copy(self._segments)
817 def clone(self, new_parent, new_name):
818 """Make a copy of this file."""
819 cp = ArvadosFile(new_parent, new_name)
820 cp.replace_contents(self)
825 def replace_contents(self, other):
826 """Replace segments of this file with segments from another `ArvadosFile` object."""
830 for other_segment in other.segments():
831 new_loc = other_segment.locator
832 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
833 if other_segment.locator not in map_loc:
834 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
835 if bufferblock.state() != _BufferBlock.WRITABLE:
836 map_loc[other_segment.locator] = bufferblock.locator()
838 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
839 new_loc = map_loc[other_segment.locator]
841 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
843 self.set_committed(False)
845 def __eq__(self, other):
848 if not isinstance(other, ArvadosFile):
851 othersegs = other.segments()
853 if len(self._segments) != len(othersegs):
855 for i in xrange(0, len(othersegs)):
856 seg1 = self._segments[i]
861 if self.parent._my_block_manager().is_bufferblock(loc1):
862 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
864 if other.parent._my_block_manager().is_bufferblock(loc2):
865 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
867 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
868 seg1.range_start != seg2.range_start or
869 seg1.range_size != seg2.range_size or
870 seg1.segment_offset != seg2.segment_offset):
875 def __ne__(self, other):
876 return not self.__eq__(other)
879 def set_segments(self, segs):
880 self._segments = segs
883 def set_committed(self, value=True):
884 """Set committed flag.
886 If value is True, set committed to be True.
888 If value is False, set committed to be False for this and all parents.
890 if value == self._committed:
892 self._committed = value
893 if self._committed is False and self.parent is not None:
894 self.parent.set_committed(False)
898 """Get whether this is committed or not."""
899 return self._committed
902 def add_writer(self, writer):
903 """Add an ArvadosFileWriter reference to the list of writers"""
904 if isinstance(writer, ArvadosFileWriter):
905 self._writers.add(writer)
908 def remove_writer(self, writer, flush):
910 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
911 and do some block maintenance tasks.
913 self._writers.remove(writer)
915 if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
916 # File writer closed, not small enough for repacking
919 # All writers closed and size is adequate for repacking
920 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
924 Get whether this is closed or not. When the writers list is empty, the file
925 is supposed to be closed.
927 return len(self._writers) == 0
931 def truncate(self, size):
932 """Shrink or expand the size of the file.
934 If `size` is less than the size of the file, the file contents after
935 `size` will be discarded. If `size` is greater than the current size
936 of the file, it will be filled with zero bytes.
939 if size < self.size():
941 for r in self._segments:
942 range_end = r.range_start+r.range_size
943 if r.range_start >= size:
944 # segment is past the trucate size, all done
946 elif size < range_end:
947 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
948 nr.segment_offset = r.segment_offset
954 self._segments = new_segs
955 self.set_committed(False)
956 elif size > self.size():
957 padding = self.parent._my_block_manager().get_padding_block()
958 diff = size - self.size()
959 while diff > config.KEEP_BLOCK_SIZE:
960 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
961 diff -= config.KEEP_BLOCK_SIZE
963 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
964 self.set_committed(False)
966 # size == self.size()
969 def readfrom(self, offset, size, num_retries, exact=False):
970 """Read up to `size` bytes from the file starting at `offset`.
973 If False (default), return less data than requested if the read
974 crosses a block boundary and the next block isn't cached. If True,
975 only return less data than requested when hitting EOF.
979 if size == 0 or offset >= self.size():
981 readsegs = locators_and_ranges(self._segments, offset, size)
982 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
987 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
989 blockview = memoryview(block)
990 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
996 if lr.locator not in locs:
997 self.parent._my_block_manager().block_prefetch(lr.locator)
1000 return ''.join(data)
1002 def _repack_writes(self, num_retries):
1003 """Optimize buffer block by repacking segments in file sequence.
1005 When the client makes random writes, they appear in the buffer block in
1006 the sequence they were written rather than the sequence they appear in
1007 the file. This makes for inefficient, fragmented manifests. Attempt
1008 to optimize by repacking writes in file sequence.
1011 segs = self._segments
1013 # Collect the segments that reference the buffer block.
1014 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
1016 # Collect total data referenced by segments (could be smaller than
1017 # bufferblock size if a portion of the file was written and
1018 # then overwritten).
1019 write_total = sum([s.range_size for s in bufferblock_segs])
1021 if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
1022 # If there's more than one segment referencing this block, it is
1023 # due to out-of-order writes and will produce a fragmented
1024 # manifest, so try to optimize by re-packing into a new buffer.
1025 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
1026 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
1027 for t in bufferblock_segs:
1028 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
1029 t.segment_offset = new_bb.size() - t.range_size
1031 self._current_bblock = new_bb
1035 def writeto(self, offset, data, num_retries):
1036 """Write `data` to the file starting at `offset`.
1038 This will update existing bytes and/or extend the size of the file as
1045 if offset > self.size():
1046 self.truncate(offset)
1048 if len(data) > config.KEEP_BLOCK_SIZE:
1049 # Chunk it up into smaller writes
1051 dataview = memoryview(data)
1052 while n < len(data):
1053 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1054 n += config.KEEP_BLOCK_SIZE
1057 self.set_committed(False)
1059 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1060 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1062 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1063 self._repack_writes(num_retries)
1064 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1065 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1066 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1068 self._current_bblock.append(data)
1070 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1072 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1077 def flush(self, sync=True, num_retries=0):
1078 """Flush the current bufferblock to Keep.
1081 If True, commit block synchronously, wait until buffer block has been written.
1082 If False, commit block asynchronously, return immediately after putting block into
1085 if self.committed():
1088 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1089 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1090 self._repack_writes(num_retries)
1091 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1095 for s in self._segments:
1096 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1098 if bb.state() != _BufferBlock.COMMITTED:
1099 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1100 to_delete.add(s.locator)
1101 s.locator = bb.locator()
1103 self.parent._my_block_manager().delete_bufferblock(s)
1105 self.parent.notify(MOD, self.parent, self.name, (self, self))
1109 def add_segment(self, blocks, pos, size):
1110 """Add a segment to the end of the file.
1112 `pos` and `offset` reference a section of the stream described by
1113 `blocks` (a list of Range objects)
1116 self._add_segment(blocks, pos, size)
1118 def _add_segment(self, blocks, pos, size):
1119 """Internal implementation of add_segment."""
1120 self.set_committed(False)
1121 for lr in locators_and_ranges(blocks, pos, size):
1122 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1123 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1124 self._segments.append(r)
1128 """Get the file size."""
1130 n = self._segments[-1]
1131 return n.range_start + n.range_size
1136 def manifest_text(self, stream_name=".", portable_locators=False,
1137 normalize=False, only_committed=False):
1140 for segment in self.segments:
1141 loc = segment.locator
1142 if self.parent._my_block_manager().is_bufferblock(loc):
1145 loc = self._bufferblocks[loc].calculate_locator()
1146 if portable_locators:
1147 loc = KeepLocator(loc).stripped()
1148 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1149 segment.segment_offset, segment.range_size))
1150 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1156 def _reparent(self, newparent, newname):
1157 self.set_committed(False)
1158 self.flush(sync=True)
1159 self.parent.remove(self.name)
1160 self.parent = newparent
1162 self.lock = self.parent.root_collection().lock
1165 class ArvadosFileReader(ArvadosFileReaderBase):
1166 """Wraps ArvadosFile in a file-like object supporting reading only.
1168 Be aware that this class is NOT thread safe as there is no locking around
1169 updating file pointer.
1173 def __init__(self, arvadosfile, num_retries=None):
1174 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1175 self.arvadosfile = arvadosfile
1178 return self.arvadosfile.size()
1180 def stream_name(self):
1181 return self.arvadosfile.parent.stream_name()
1183 @_FileLikeObjectBase._before_close
1185 def read(self, size=None, num_retries=None):
1186 """Read up to `size` bytes from the file and return the result.
1188 Starts at the current file position. If `size` is None, read the
1189 entire remainder of the file.
1193 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1196 self._filepos += len(rd)
1197 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1198 return ''.join(data)
1200 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1201 self._filepos += len(data)
1204 @_FileLikeObjectBase._before_close
1206 def readfrom(self, offset, size, num_retries=None):
1207 """Read up to `size` bytes from the stream, starting at the specified file offset.
1209 This method does not change the file position.
1211 return self.arvadosfile.readfrom(offset, size, num_retries)
1217 class ArvadosFileWriter(ArvadosFileReader):
1218 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1220 Be aware that this class is NOT thread safe as there is no locking around
1221 updating file pointer.
1225 def __init__(self, arvadosfile, mode, num_retries=None):
1226 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1228 self.arvadosfile.add_writer(self)
1233 @_FileLikeObjectBase._before_close
1235 def write(self, data, num_retries=None):
1236 if self.mode[0] == "a":
1237 self.arvadosfile.writeto(self.size(), data, num_retries)
1239 self.arvadosfile.writeto(self._filepos, data, num_retries)
1240 self._filepos += len(data)
1243 @_FileLikeObjectBase._before_close
1245 def writelines(self, seq, num_retries=None):
1247 self.write(s, num_retries=num_retries)
1249 @_FileLikeObjectBase._before_close
1250 def truncate(self, size=None):
1252 size = self._filepos
1253 self.arvadosfile.truncate(size)
1255 @_FileLikeObjectBase._before_close
1257 self.arvadosfile.flush()
1259 def close(self, flush=True):
1261 self.arvadosfile.remove_writer(self, flush)
1262 super(ArvadosFileWriter, self).close()