16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
25 _logger = logging.getLogger('arvados.arvfile')
28 """split(path) -> streamname, filename
30 Separate the stream name and file name in a /-separated stream path and
31 return a tuple (stream_name, file_name). If no stream name is available,
36 stream_name, file_name = path.rsplit('/', 1)
37 except ValueError: # No / in string
38 stream_name, file_name = '.', path
39 return stream_name, file_name
42 class UnownedBlockError(Exception):
43 """Raised when there's an writable block without an owner on the BlockManager."""
47 class _FileLikeObjectBase(object):
48 def __init__(self, name, mode):
54 def _before_close(orig_func):
55 @functools.wraps(orig_func)
56 def before_close_wrapper(self, *args, **kwargs):
58 raise ValueError("I/O operation on closed stream file")
59 return orig_func(self, *args, **kwargs)
60 return before_close_wrapper
65 def __exit__(self, exc_type, exc_value, traceback):
76 class ArvadosFileReaderBase(_FileLikeObjectBase):
77 def __init__(self, name, mode, num_retries=None):
78 super(ArvadosFileReaderBase, self).__init__(name, mode)
80 self.num_retries = num_retries
81 self._readline_cache = (None, None)
85 data = self.readline()
90 def decompressed_name(self):
91 return re.sub('\.(bz2|gz)$', '', self.name)
93 @_FileLikeObjectBase._before_close
94 def seek(self, pos, whence=os.SEEK_SET):
95 if whence == os.SEEK_CUR:
97 elif whence == os.SEEK_END:
100 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
116 @_FileLikeObjectBase._before_close
118 def readall(self, size=2**20, num_retries=None):
120 data = self.read(size, num_retries=num_retries)
125 @_FileLikeObjectBase._before_close
127 def readline(self, size=float('inf'), num_retries=None):
128 cache_pos, cache_data = self._readline_cache
129 if self.tell() == cache_pos:
131 self._filepos += len(cache_data)
134 data_size = len(data[-1])
135 while (data_size < size) and ('\n' not in data[-1]):
136 next_read = self.read(2 ** 20, num_retries=num_retries)
139 data.append(next_read)
140 data_size += len(next_read)
143 nextline_index = data.index('\n') + 1
145 nextline_index = len(data)
146 nextline_index = min(nextline_index, size)
147 self._filepos -= len(data) - nextline_index
148 self._readline_cache = (self.tell(), data[nextline_index:])
149 return data[:nextline_index]
151 @_FileLikeObjectBase._before_close
153 def decompress(self, decompress, size, num_retries=None):
154 for segment in self.readall(size, num_retries=num_retries):
155 data = decompress(segment)
159 @_FileLikeObjectBase._before_close
161 def readall_decompressed(self, size=2**20, num_retries=None):
163 if self.name.endswith('.bz2'):
164 dc = bz2.BZ2Decompressor()
165 return self.decompress(dc.decompress, size,
166 num_retries=num_retries)
167 elif self.name.endswith('.gz'):
168 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
169 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
170 size, num_retries=num_retries)
172 return self.readall(size, num_retries=num_retries)
174 @_FileLikeObjectBase._before_close
176 def readlines(self, sizehint=float('inf'), num_retries=None):
179 for s in self.readall(num_retries=num_retries):
182 if data_size >= sizehint:
184 return ''.join(data).splitlines(True)
187 raise IOError(errno.ENOSYS, "Not implemented")
189 def read(self, size, num_retries=None):
190 raise IOError(errno.ENOSYS, "Not implemented")
192 def readfrom(self, start, size, num_retries=None):
193 raise IOError(errno.ENOSYS, "Not implemented")
196 class StreamFileReader(ArvadosFileReaderBase):
197 class _NameAttribute(str):
198 # The Python file API provides a plain .name attribute.
199 # Older SDK provided a name() method.
200 # This class provides both, for maximum compatibility.
204 def __init__(self, stream, segments, name):
205 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
206 self._stream = stream
207 self.segments = segments
209 def stream_name(self):
210 return self._stream.name()
213 n = self.segments[-1]
214 return n.range_start + n.range_size
216 @_FileLikeObjectBase._before_close
218 def read(self, size, num_retries=None):
219 """Read up to 'size' bytes from the stream, starting at the current file position"""
224 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
226 lr = available_chunks[0]
227 data = self._stream.readfrom(lr.locator+lr.segment_offset,
229 num_retries=num_retries)
231 self._filepos += len(data)
234 @_FileLikeObjectBase._before_close
236 def readfrom(self, start, size, num_retries=None):
237 """Read up to 'size' bytes from the stream, starting at 'start'"""
242 for lr in locators_and_ranges(self.segments, start, size):
243 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
244 num_retries=num_retries))
247 def as_manifest(self):
249 for r in self.segments:
250 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
251 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
254 def synchronized(orig_func):
255 @functools.wraps(orig_func)
256 def synchronized_wrapper(self, *args, **kwargs):
258 return orig_func(self, *args, **kwargs)
259 return synchronized_wrapper
262 class StateChangeError(Exception):
263 def __init__(self, message, state, nextstate):
264 super(StateChangeError, self).__init__(message)
266 self.nextstate = nextstate
268 class _BufferBlock(object):
269 """A stand-in for a Keep block that is in the process of being written.
271 Writers can append to it, get the size, and compute the Keep locator.
272 There are three valid states:
278 Block is in the process of being uploaded to Keep, append is an error.
281 The block has been written to Keep, its internal buffer has been
282 released, fetching the block will fetch it via keep client (since we
283 discarded the internal copy), and identifiers referring to the BufferBlock
284 can be replaced with the block locator.
294 def __init__(self, blockid, starting_capacity, owner):
297 the identifier for this block
300 the initial buffer capacity
303 ArvadosFile that owns this block
306 self.blockid = blockid
307 self.buffer_block = bytearray(starting_capacity)
308 self.buffer_view = memoryview(self.buffer_block)
309 self.write_pointer = 0
310 self._state = _BufferBlock.WRITABLE
313 self.lock = threading.Lock()
314 self.wait_for_commit = threading.Event()
318 def append(self, data):
319 """Append some data to the buffer.
321 Only valid if the block is in WRITABLE state. Implements an expanding
322 buffer, doubling capacity as needed to accomdate all the data.
325 if self._state == _BufferBlock.WRITABLE:
326 while (self.write_pointer+len(data)) > len(self.buffer_block):
327 new_buffer_block = bytearray(len(self.buffer_block) * 2)
328 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
329 self.buffer_block = new_buffer_block
330 self.buffer_view = memoryview(self.buffer_block)
331 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
332 self.write_pointer += len(data)
335 raise AssertionError("Buffer block is not writable")
337 STATE_TRANSITIONS = frozenset([
339 (PENDING, COMMITTED),
344 def set_state(self, nextstate, val=None):
345 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
346 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
347 self._state = nextstate
349 if self._state == _BufferBlock.PENDING:
350 self.wait_for_commit.clear()
352 if self._state == _BufferBlock.COMMITTED:
354 self.buffer_view = None
355 self.buffer_block = None
356 self.wait_for_commit.set()
358 if self._state == _BufferBlock.ERROR:
360 self.wait_for_commit.set()
367 """The amount of data written to the buffer."""
368 return self.write_pointer
372 """The Keep locator for this buffer's contents."""
373 if self._locator is None:
374 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
378 def clone(self, new_blockid, owner):
379 if self._state == _BufferBlock.COMMITTED:
380 raise AssertionError("Cannot duplicate committed buffer block")
381 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
382 bufferblock.append(self.buffer_view[0:self.size()])
387 self._state = _BufferBlock.DELETED
389 self.buffer_block = None
390 self.buffer_view = None
393 class NoopLock(object):
397 def __exit__(self, exc_type, exc_value, traceback):
400 def acquire(self, blocking=False):
407 def must_be_writable(orig_func):
408 @functools.wraps(orig_func)
409 def must_be_writable_wrapper(self, *args, **kwargs):
410 if not self.writable():
411 raise IOError(errno.EROFS, "Collection is read-only.")
412 return orig_func(self, *args, **kwargs)
413 return must_be_writable_wrapper
416 class _BlockManager(object):
417 """BlockManager handles buffer blocks.
419 Also handles background block uploads, and background block prefetch for a
420 Collection of ArvadosFiles.
424 DEFAULT_PUT_THREADS = 2
425 DEFAULT_GET_THREADS = 2
427 def __init__(self, keep, copies=None, put_threads=None):
428 """keep: KeepClient object to use"""
430 self._bufferblocks = collections.OrderedDict()
431 self._put_queue = None
432 self._put_threads = None
433 self._prefetch_queue = None
434 self._prefetch_threads = None
435 self.lock = threading.Lock()
436 self.prefetch_enabled = True
438 self.num_put_threads = put_threads
440 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
441 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
443 self._pending_write_size = 0
444 self.threads_lock = threading.Lock()
445 self.padding_block = None
448 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
449 """Allocate a new, empty bufferblock in WRITABLE state and return it.
452 optional block identifier, otherwise one will be automatically assigned
455 optional capacity, otherwise will use default capacity
458 ArvadosFile that owns this block
461 return self._alloc_bufferblock(blockid, starting_capacity, owner)
463 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
465 blockid = str(uuid.uuid4())
466 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
467 self._bufferblocks[bufferblock.blockid] = bufferblock
471 def dup_block(self, block, owner):
472 """Create a new bufferblock initialized with the content of an existing bufferblock.
475 the buffer block to copy.
478 ArvadosFile that owns the new block
481 new_blockid = str(uuid.uuid4())
482 bufferblock = block.clone(new_blockid, owner)
483 self._bufferblocks[bufferblock.blockid] = bufferblock
487 def is_bufferblock(self, locator):
488 return locator in self._bufferblocks
490 def _commit_bufferblock_worker(self):
491 """Background uploader thread."""
495 bufferblock = self._put_queue.get()
496 if bufferblock is None:
499 if self.copies is None:
500 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
502 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
503 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
505 except Exception as e:
506 bufferblock.set_state(_BufferBlock.ERROR, e)
508 if self._put_queue is not None:
509 self._put_queue.task_done()
511 def start_put_threads(self):
512 with self.threads_lock:
513 if self._put_threads is None:
514 # Start uploader threads.
516 # If we don't limit the Queue size, the upload queue can quickly
517 # grow to take up gigabytes of RAM if the writing process is
518 # generating data more quickly than it can be send to the Keep
521 # With two upload threads and a queue size of 2, this means up to 4
522 # blocks pending. If they are full 64 MiB blocks, that means up to
523 # 256 MiB of internal buffering, which is the same size as the
524 # default download block cache in KeepClient.
525 self._put_queue = Queue.Queue(maxsize=2)
527 self._put_threads = []
528 for i in xrange(0, self.num_put_threads):
529 thread = threading.Thread(target=self._commit_bufferblock_worker)
530 self._put_threads.append(thread)
534 def _block_prefetch_worker(self):
535 """The background downloader thread."""
538 b = self._prefetch_queue.get()
543 _logger.exception("Exception doing block prefetch")
546 def start_get_threads(self):
547 if self._prefetch_threads is None:
548 self._prefetch_queue = Queue.Queue()
549 self._prefetch_threads = []
550 for i in xrange(0, self.num_get_threads):
551 thread = threading.Thread(target=self._block_prefetch_worker)
552 self._prefetch_threads.append(thread)
558 def stop_threads(self):
559 """Shut down and wait for background upload and download threads to finish."""
561 if self._put_threads is not None:
562 for t in self._put_threads:
563 self._put_queue.put(None)
564 for t in self._put_threads:
566 self._put_threads = None
567 self._put_queue = None
569 if self._prefetch_threads is not None:
570 for t in self._prefetch_threads:
571 self._prefetch_queue.put(None)
572 for t in self._prefetch_threads:
574 self._prefetch_threads = None
575 self._prefetch_queue = None
580 def __exit__(self, exc_type, exc_value, traceback):
584 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
585 """Packs small blocks together before uploading"""
586 self._pending_write_size += closed_file_size
588 # Check if there are enough small blocks for filling up one in full
589 if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
591 # Search blocks ready for getting packed together before being committed to Keep.
592 # A WRITABLE block always has an owner.
593 # A WRITABLE block with its owner.closed() implies that it's
594 # size is <= KEEP_BLOCK_SIZE/2.
596 small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
597 except AttributeError:
598 # Writable blocks without owner shouldn't exist.
599 raise UnownedBlockError()
601 if len(small_blocks) <= 1:
602 # Not enough small blocks for repacking
605 # Update the pending write size count with its true value, just in case
606 # some small file was opened, written and closed several times.
607 self._pending_write_size = sum([b.size() for b in small_blocks])
608 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
611 new_bb = self._alloc_bufferblock()
613 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
614 bb = small_blocks.pop(0)
615 self._pending_write_size -= bb.size()
616 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
617 files.append((bb, new_bb.write_pointer - bb.size()))
619 self.commit_bufferblock(new_bb, sync=sync)
621 for bb, segment_offset in files:
622 bb.owner.set_segments([Range(new_bb.locator(), 0, bb.size(), segment_offset)])
623 self._delete_bufferblock(bb.blockid)
625 def commit_bufferblock(self, block, sync):
626 """Initiate a background upload of a bufferblock.
629 The block object to upload
632 If `sync` is True, upload the block synchronously.
633 If `sync` is False, upload the block asynchronously. This will
634 return immediately unless the upload queue is at capacity, in
635 which case it will wait on an upload queue slot.
639 # Mark the block as PENDING so to disallow any more appends.
640 block.set_state(_BufferBlock.PENDING)
641 except StateChangeError as e:
642 if e.state == _BufferBlock.PENDING:
644 block.wait_for_commit.wait()
647 if block.state() == _BufferBlock.COMMITTED:
649 elif block.state() == _BufferBlock.ERROR:
656 if self.copies is None:
657 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
659 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
660 block.set_state(_BufferBlock.COMMITTED, loc)
661 except Exception as e:
662 block.set_state(_BufferBlock.ERROR, e)
665 self.start_put_threads()
666 self._put_queue.put(block)
669 def get_bufferblock(self, locator):
670 return self._bufferblocks.get(locator)
673 def get_padding_block(self):
674 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
675 when using truncate() to extend the size of a file.
677 For reference (and possible future optimization), the md5sum of the
678 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
682 if self.padding_block is None:
683 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
684 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
685 self.commit_bufferblock(self.padding_block, False)
686 return self.padding_block
689 def delete_bufferblock(self, locator):
690 self._delete_bufferblock(locator)
692 def _delete_bufferblock(self, locator):
693 bb = self._bufferblocks[locator]
695 del self._bufferblocks[locator]
697 def get_block_contents(self, locator, num_retries, cache_only=False):
700 First checks to see if the locator is a BufferBlock and return that, if
701 not, passes the request through to KeepClient.get().
705 if locator in self._bufferblocks:
706 bufferblock = self._bufferblocks[locator]
707 if bufferblock.state() != _BufferBlock.COMMITTED:
708 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
710 locator = bufferblock._locator
712 return self._keep.get_from_cache(locator)
714 return self._keep.get(locator, num_retries=num_retries)
716 def commit_all(self):
717 """Commit all outstanding buffer blocks.
719 This is a synchronous call, and will not return until all buffer blocks
720 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
723 self.repack_small_blocks(force=True, sync=True)
726 items = self._bufferblocks.items()
729 if v.state() != _BufferBlock.COMMITTED and v.owner:
730 v.owner.flush(sync=False)
733 if self._put_queue is not None:
734 self._put_queue.join()
738 if v.state() == _BufferBlock.ERROR:
739 err.append((v.locator(), v.error))
741 raise KeepWriteError("Error writing some blocks", err, label="block")
744 # flush again with sync=True to remove committed bufferblocks from
747 v.owner.flush(sync=True)
749 def block_prefetch(self, locator):
750 """Initiate a background download of a block.
752 This assumes that the underlying KeepClient implements a block cache,
753 so repeated requests for the same block will not result in repeated
754 downloads (unless the block is evicted from the cache.) This method
759 if not self.prefetch_enabled:
762 if self._keep.get_from_cache(locator) is not None:
766 if locator in self._bufferblocks:
769 self.start_get_threads()
770 self._prefetch_queue.put(locator)
773 class ArvadosFile(object):
774 """Represent a file in a Collection.
776 ArvadosFile manages the underlying representation of a file in Keep as a
777 sequence of segments spanning a set of blocks, and implements random
780 This object may be accessed from multiple threads.
784 def __init__(self, parent, name, stream=[], segments=[]):
786 ArvadosFile constructor.
789 a list of Range objects representing a block stream
792 a list of Range objects representing segments
796 self._writers = set()
797 self._committed = False
799 self.lock = parent.root_collection().lock
801 self._add_segment(stream, s.locator, s.range_size)
802 self._current_bblock = None
805 return self.parent.writable()
808 def permission_expired(self, as_of_dt=None):
809 """Returns True if any of the segment's locators is expired"""
810 for r in self._segments:
811 if KeepLocator(r.locator).permission_expired(as_of_dt):
817 return copy.copy(self._segments)
820 def clone(self, new_parent, new_name):
821 """Make a copy of this file."""
822 cp = ArvadosFile(new_parent, new_name)
823 cp.replace_contents(self)
828 def replace_contents(self, other):
829 """Replace segments of this file with segments from another `ArvadosFile` object."""
833 for other_segment in other.segments():
834 new_loc = other_segment.locator
835 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
836 if other_segment.locator not in map_loc:
837 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
838 if bufferblock.state() != _BufferBlock.WRITABLE:
839 map_loc[other_segment.locator] = bufferblock.locator()
841 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
842 new_loc = map_loc[other_segment.locator]
844 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
846 self.set_committed(False)
848 def __eq__(self, other):
851 if not isinstance(other, ArvadosFile):
854 othersegs = other.segments()
856 if len(self._segments) != len(othersegs):
858 for i in xrange(0, len(othersegs)):
859 seg1 = self._segments[i]
864 if self.parent._my_block_manager().is_bufferblock(loc1):
865 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
867 if other.parent._my_block_manager().is_bufferblock(loc2):
868 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
870 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
871 seg1.range_start != seg2.range_start or
872 seg1.range_size != seg2.range_size or
873 seg1.segment_offset != seg2.segment_offset):
878 def __ne__(self, other):
879 return not self.__eq__(other)
882 def set_segments(self, segs):
883 self._segments = segs
886 def set_committed(self, value=True):
887 """Set committed flag.
889 If value is True, set committed to be True.
891 If value is False, set committed to be False for this and all parents.
893 if value == self._committed:
895 self._committed = value
896 if self._committed is False and self.parent is not None:
897 self.parent.set_committed(False)
901 """Get whether this is committed or not."""
902 return self._committed
905 def add_writer(self, writer):
906 """Add an ArvadosFileWriter reference to the list of writers"""
907 if isinstance(writer, ArvadosFileWriter):
908 self._writers.add(writer)
911 def remove_writer(self, writer, flush):
913 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
914 and do some block maintenance tasks.
916 self._writers.remove(writer)
918 if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
919 # File writer closed, not small enough for repacking
922 # All writers closed and size is adequate for repacking
923 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
927 Get whether this is closed or not. When the writers list is empty, the file
928 is supposed to be closed.
930 return len(self._writers) == 0
934 def truncate(self, size):
935 """Shrink or expand the size of the file.
937 If `size` is less than the size of the file, the file contents after
938 `size` will be discarded. If `size` is greater than the current size
939 of the file, it will be filled with zero bytes.
942 if size < self.size():
944 for r in self._segments:
945 range_end = r.range_start+r.range_size
946 if r.range_start >= size:
947 # segment is past the trucate size, all done
949 elif size < range_end:
950 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
951 nr.segment_offset = r.segment_offset
957 self._segments = new_segs
958 self.set_committed(False)
959 elif size > self.size():
960 padding = self.parent._my_block_manager().get_padding_block()
961 diff = size - self.size()
962 while diff > config.KEEP_BLOCK_SIZE:
963 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
964 diff -= config.KEEP_BLOCK_SIZE
966 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
967 self.set_committed(False)
969 # size == self.size()
972 def readfrom(self, offset, size, num_retries, exact=False):
973 """Read up to `size` bytes from the file starting at `offset`.
976 If False (default), return less data than requested if the read
977 crosses a block boundary and the next block isn't cached. If True,
978 only return less data than requested when hitting EOF.
982 if size == 0 or offset >= self.size():
984 readsegs = locators_and_ranges(self._segments, offset, size)
985 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
990 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
992 blockview = memoryview(block)
993 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
999 if lr.locator not in locs:
1000 self.parent._my_block_manager().block_prefetch(lr.locator)
1001 locs.add(lr.locator)
1003 return ''.join(data)
1005 def _repack_writes(self, num_retries):
1006 """Optimize buffer block by repacking segments in file sequence.
1008 When the client makes random writes, they appear in the buffer block in
1009 the sequence they were written rather than the sequence they appear in
1010 the file. This makes for inefficient, fragmented manifests. Attempt
1011 to optimize by repacking writes in file sequence.
1014 segs = self._segments
1016 # Collect the segments that reference the buffer block.
1017 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
1019 # Collect total data referenced by segments (could be smaller than
1020 # bufferblock size if a portion of the file was written and
1021 # then overwritten).
1022 write_total = sum([s.range_size for s in bufferblock_segs])
1024 if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
1025 # If there's more than one segment referencing this block, it is
1026 # due to out-of-order writes and will produce a fragmented
1027 # manifest, so try to optimize by re-packing into a new buffer.
1028 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
1029 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
1030 for t in bufferblock_segs:
1031 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
1032 t.segment_offset = new_bb.size() - t.range_size
1034 self._current_bblock = new_bb
1038 def writeto(self, offset, data, num_retries):
1039 """Write `data` to the file starting at `offset`.
1041 This will update existing bytes and/or extend the size of the file as
1048 if offset > self.size():
1049 self.truncate(offset)
1051 if len(data) > config.KEEP_BLOCK_SIZE:
1052 # Chunk it up into smaller writes
1054 dataview = memoryview(data)
1055 while n < len(data):
1056 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1057 n += config.KEEP_BLOCK_SIZE
1060 self.set_committed(False)
1062 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1063 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1065 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1066 self._repack_writes(num_retries)
1067 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1068 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1069 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1071 self._current_bblock.append(data)
1073 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1075 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1080 def flush(self, sync=True, num_retries=0):
1081 """Flush the current bufferblock to Keep.
1084 If True, commit block synchronously, wait until buffer block has been written.
1085 If False, commit block asynchronously, return immediately after putting block into
1088 if self.committed():
1091 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1092 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1093 self._repack_writes(num_retries)
1094 if self._current_bblock.state() != _BufferBlock.DELETED:
1095 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1099 for s in self._segments:
1100 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1102 if bb.state() != _BufferBlock.COMMITTED:
1103 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1104 to_delete.add(s.locator)
1105 s.locator = bb.locator()
1107 self.parent._my_block_manager().delete_bufferblock(s)
1109 self.parent.notify(MOD, self.parent, self.name, (self, self))
1113 def add_segment(self, blocks, pos, size):
1114 """Add a segment to the end of the file.
1116 `pos` and `offset` reference a section of the stream described by
1117 `blocks` (a list of Range objects)
1120 self._add_segment(blocks, pos, size)
1122 def _add_segment(self, blocks, pos, size):
1123 """Internal implementation of add_segment."""
1124 self.set_committed(False)
1125 for lr in locators_and_ranges(blocks, pos, size):
1126 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1127 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1128 self._segments.append(r)
1132 """Get the file size."""
1134 n = self._segments[-1]
1135 return n.range_start + n.range_size
1140 def manifest_text(self, stream_name=".", portable_locators=False,
1141 normalize=False, only_committed=False):
1144 for segment in self.segments:
1145 loc = segment.locator
1146 if self.parent._my_block_manager().is_bufferblock(loc):
1149 loc = self._bufferblocks[loc].calculate_locator()
1150 if portable_locators:
1151 loc = KeepLocator(loc).stripped()
1152 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1153 segment.segment_offset, segment.range_size))
1154 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1160 def _reparent(self, newparent, newname):
1161 self.set_committed(False)
1162 self.flush(sync=True)
1163 self.parent.remove(self.name)
1164 self.parent = newparent
1166 self.lock = self.parent.root_collection().lock
1169 class ArvadosFileReader(ArvadosFileReaderBase):
1170 """Wraps ArvadosFile in a file-like object supporting reading only.
1172 Be aware that this class is NOT thread safe as there is no locking around
1173 updating file pointer.
1177 def __init__(self, arvadosfile, num_retries=None):
1178 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1179 self.arvadosfile = arvadosfile
1182 return self.arvadosfile.size()
1184 def stream_name(self):
1185 return self.arvadosfile.parent.stream_name()
1187 @_FileLikeObjectBase._before_close
1189 def read(self, size=None, num_retries=None):
1190 """Read up to `size` bytes from the file and return the result.
1192 Starts at the current file position. If `size` is None, read the
1193 entire remainder of the file.
1197 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1200 self._filepos += len(rd)
1201 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1202 return ''.join(data)
1204 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1205 self._filepos += len(data)
1208 @_FileLikeObjectBase._before_close
1210 def readfrom(self, offset, size, num_retries=None):
1211 """Read up to `size` bytes from the stream, starting at the specified file offset.
1213 This method does not change the file position.
1215 return self.arvadosfile.readfrom(offset, size, num_retries)
1221 class ArvadosFileWriter(ArvadosFileReader):
1222 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1224 Be aware that this class is NOT thread safe as there is no locking around
1225 updating file pointer.
1229 def __init__(self, arvadosfile, mode, num_retries=None):
1230 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1232 self.arvadosfile.add_writer(self)
1237 @_FileLikeObjectBase._before_close
1239 def write(self, data, num_retries=None):
1240 if self.mode[0] == "a":
1241 self.arvadosfile.writeto(self.size(), data, num_retries)
1243 self.arvadosfile.writeto(self._filepos, data, num_retries)
1244 self._filepos += len(data)
1247 @_FileLikeObjectBase._before_close
1249 def writelines(self, seq, num_retries=None):
1251 self.write(s, num_retries=num_retries)
1253 @_FileLikeObjectBase._before_close
1254 def truncate(self, size=None):
1256 size = self._filepos
1257 self.arvadosfile.truncate(size)
1259 @_FileLikeObjectBase._before_close
1261 self.arvadosfile.flush()
1263 def close(self, flush=True):
1265 self.arvadosfile.remove_writer(self, flush)
1266 super(ArvadosFileWriter, self).close()