X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4115524460e89f36b99f2fb5de1adebd86daaf4a..31d31c010bb6b5170e3962fdd50c6d393cfe6076:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 4cc2591ebb..b9faa11c20 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -1,18 +1,26 @@ -import functools -import os -import zlib +from __future__ import absolute_import +from __future__ import division +from future import standard_library +from future.utils import listitems, listvalues +standard_library.install_aliases() +from builtins import range +from builtins import object import bz2 -import config -import hashlib -import threading -import Queue +import collections import copy import errno -import re +import functools +import hashlib import logging -import collections +import os +import queue +import re +import sys +import threading import uuid +import zlib +from . import config from .errors import KeepWriteError, AssertionError, ArgumentError from .keep import KeepLocator from ._normalize_stream import normalize_stream @@ -38,6 +46,12 @@ def split(path): stream_name, file_name = '.', path return stream_name, file_name + +class UnownedBlockError(Exception): + """Raised when there's an writable block without an owner on the BlockManager.""" + pass + + class _FileLikeObjectBase(object): def __init__(self, name, mode): self.name = name @@ -70,7 +84,10 @@ class _FileLikeObjectBase(object): class ArvadosFileReaderBase(_FileLikeObjectBase): def __init__(self, name, mode, num_retries=None): super(ArvadosFileReaderBase, self).__init__(name, mode) - self._filepos = 0L + self._binary = 'b' in mode + if sys.version_info >= (3, 0) and not self._binary: + raise NotImplementedError("text mode {!r} is not implemented".format(mode)) + self._filepos = 0 self.num_retries = num_retries self._readline_cache = (None, None) @@ -90,17 +107,29 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): pos += self._filepos elif whence == os.SEEK_END: pos += self.size() - self._filepos = min(max(pos, 0L), self.size()) + if pos < 0: + raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") + self._filepos = pos + return self._filepos def tell(self): return self._filepos + def readable(self): + return True + + def writable(self): + return False + + def seekable(self): + return True + @_FileLikeObjectBase._before_close @retry_method def readall(self, size=2**20, num_retries=None): while True: data = self.read(size, num_retries=num_retries) - if data == '': + if len(data) == 0: break yield data @@ -112,23 +141,23 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): data = [cache_data] self._filepos += len(cache_data) else: - data = [''] + data = [b''] data_size = len(data[-1]) - while (data_size < size) and ('\n' not in data[-1]): + while (data_size < size) and (b'\n' not in data[-1]): next_read = self.read(2 ** 20, num_retries=num_retries) if not next_read: break data.append(next_read) data_size += len(next_read) - data = ''.join(data) + data = b''.join(data) try: - nextline_index = data.index('\n') + 1 + nextline_index = data.index(b'\n') + 1 except ValueError: nextline_index = len(data) nextline_index = min(nextline_index, size) self._filepos -= len(data) - nextline_index self._readline_cache = (self.tell(), data[nextline_index:]) - return data[:nextline_index] + return data[:nextline_index].decode() @_FileLikeObjectBase._before_close @retry_method @@ -163,16 +192,16 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): data_size += len(s) if data_size >= sizehint: break - return ''.join(data).splitlines(True) + return b''.join(data).decode().splitlines(True) def size(self): - raise NotImplementedError() + raise IOError(errno.ENOSYS, "Not implemented") def read(self, size, num_retries=None): - raise NotImplementedError() + raise IOError(errno.ENOSYS, "Not implemented") def readfrom(self, start, size, num_retries=None): - raise NotImplementedError() + raise IOError(errno.ENOSYS, "Not implemented") class StreamFileReader(ArvadosFileReaderBase): @@ -200,15 +229,15 @@ class StreamFileReader(ArvadosFileReaderBase): def read(self, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at the current file position""" if size == 0: - return '' + return b'' - data = '' + data = b'' available_chunks = locators_and_ranges(self.segments, self._filepos, size) if available_chunks: lr = available_chunks[0] data = self._stream.readfrom(lr.locator+lr.segment_offset, - lr.segment_size, - num_retries=num_retries) + lr.segment_size, + num_retries=num_retries) self._filepos += len(data) return data @@ -218,13 +247,13 @@ class StreamFileReader(ArvadosFileReaderBase): def readfrom(self, start, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at 'start'""" if size == 0: - return '' + return b'' data = [] for lr in locators_and_ranges(self.segments, start, size): data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, num_retries=num_retries)) - return ''.join(data) + return b''.join(data) def as_manifest(self): segs = [] @@ -304,6 +333,8 @@ class _BufferBlock(object): """ if self._state == _BufferBlock.WRITABLE: + if not isinstance(data, bytes) and not isinstance(data, memoryview): + data = data.encode() while (self.write_pointer+len(data)) > len(self.buffer_block): new_buffer_block = bytearray(len(self.buffer_block) * 2) new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer] @@ -404,7 +435,7 @@ class _BlockManager(object): DEFAULT_PUT_THREADS = 2 DEFAULT_GET_THREADS = 2 - def __init__(self, keep, copies=None): + def __init__(self, keep, copies=None, put_threads=None): """keep: KeepClient object to use""" self._keep = keep self._bufferblocks = collections.OrderedDict() @@ -414,11 +445,15 @@ class _BlockManager(object): self._prefetch_threads = None self.lock = threading.Lock() self.prefetch_enabled = True - self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS + if put_threads: + self.num_put_threads = put_threads + else: + self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS self.copies = copies self._pending_write_size = 0 self.threads_lock = threading.Lock() + self.padding_block = None @synchronized def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): @@ -498,10 +533,10 @@ class _BlockManager(object): # blocks pending. If they are full 64 MiB blocks, that means up to # 256 MiB of internal buffering, which is the same size as the # default download block cache in KeepClient. - self._put_queue = Queue.Queue(maxsize=2) + self._put_queue = queue.Queue(maxsize=2) self._put_threads = [] - for i in xrange(0, self.num_put_threads): + for i in range(0, self.num_put_threads): thread = threading.Thread(target=self._commit_bufferblock_worker) self._put_threads.append(thread) thread.daemon = True @@ -521,9 +556,9 @@ class _BlockManager(object): @synchronized def start_get_threads(self): if self._prefetch_threads is None: - self._prefetch_queue = Queue.Queue() + self._prefetch_queue = queue.Queue() self._prefetch_threads = [] - for i in xrange(0, self.num_get_threads): + for i in range(0, self.num_get_threads): thread = threading.Thread(target=self._block_prefetch_worker) self._prefetch_threads.append(thread) thread.daemon = True @@ -568,7 +603,11 @@ class _BlockManager(object): # A WRITABLE block always has an owner. # A WRITABLE block with its owner.closed() implies that it's # size is <= KEEP_BLOCK_SIZE/2. - small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] + try: + small_blocks = [b for b in listvalues(self._bufferblocks) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] + except AttributeError: + # Writable blocks without owner shouldn't exist. + raise UnownedBlockError() if len(small_blocks) <= 1: # Not enough small blocks for repacking @@ -640,6 +679,22 @@ class _BlockManager(object): def get_bufferblock(self, locator): return self._bufferblocks.get(locator) + @synchronized + def get_padding_block(self): + """Get a bufferblock 64 MB in size consisting of all zeros, used as padding + when using truncate() to extend the size of a file. + + For reference (and possible future optimization), the md5sum of the + padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864 + + """ + + if self.padding_block is None: + self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE) + self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE + self.commit_bufferblock(self.padding_block, False) + return self.padding_block + @synchronized def delete_bufferblock(self, locator): self._delete_bufferblock(locator) @@ -678,7 +733,7 @@ class _BlockManager(object): self.repack_small_blocks(force=True, sync=True) with self.lock: - items = self._bufferblocks.items() + items = listitems(self._bufferblocks) for k,v in items: if v.state() != _BufferBlock.COMMITTED and v.owner: @@ -798,7 +853,7 @@ class ArvadosFile(object): self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) - self._committed = False + self.set_committed(False) def __eq__(self, other): if other is self: @@ -810,7 +865,7 @@ class ArvadosFile(object): with self.lock: if len(self._segments) != len(othersegs): return False - for i in xrange(0, len(othersegs)): + for i in range(0, len(othersegs)): seg1 = self._segments[i] seg2 = othersegs[i] loc1 = seg1.locator @@ -838,9 +893,18 @@ class ArvadosFile(object): self._segments = segs @synchronized - def set_committed(self): - """Set committed flag to True""" - self._committed = True + def set_committed(self, value=True): + """Set committed flag. + + If value is True, set committed to be True. + + If value is False, set committed to be False for this and all parents. + """ + if value == self._committed: + return + self._committed = value + if self._committed is False and self.parent is not None: + self.parent.set_committed(False) @synchronized def committed(self): @@ -861,7 +925,7 @@ class ArvadosFile(object): """ self._writers.remove(writer) - if flush or self.size() > config.KEEP_BLOCK_SIZE / 2: + if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: # File writer closed, not small enough for repacking self.flush() elif self.closed(): @@ -878,11 +942,11 @@ class ArvadosFile(object): @must_be_writable @synchronized def truncate(self, size): - """Shrink the size of the file. + """Shrink or expand the size of the file. If `size` is less than the size of the file, the file contents after `size` will be discarded. If `size` is greater than the current size - of the file, an IOError will be raised. + of the file, it will be filled with zero bytes. """ if size < self.size(): @@ -901,9 +965,19 @@ class ArvadosFile(object): new_segs.append(r) self._segments = new_segs - self._committed = False + self.set_committed(False) elif size > self.size(): - raise IOError(errno.EINVAL, "truncate() does not support extending the file size") + padding = self.parent._my_block_manager().get_padding_block() + diff = size - self.size() + while diff > config.KEEP_BLOCK_SIZE: + self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) + diff -= config.KEEP_BLOCK_SIZE + if diff > 0: + self._segments.append(Range(padding.blockid, self.size(), diff, 0)) + self.set_committed(False) + else: + # size == self.size() + pass def readfrom(self, offset, size, num_retries, exact=False): """Read up to `size` bytes from the file starting at `offset`. @@ -916,7 +990,7 @@ class ArvadosFile(object): with self.lock: if size == 0 or offset >= self.size(): - return '' + return b'' readsegs = locators_and_ranges(self._segments, offset, size) prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32) @@ -936,26 +1010,31 @@ class ArvadosFile(object): self.parent._my_block_manager().block_prefetch(lr.locator) locs.add(lr.locator) - return ''.join(data) + return b''.join(data) def _repack_writes(self, num_retries): - """Test if the buffer block has more data than actual segments. + """Optimize buffer block by repacking segments in file sequence. - This happens when a buffered write over-writes a file range written in - a previous buffered write. Re-pack the buffer block for efficiency - and to avoid leaking information. + When the client makes random writes, they appear in the buffer block in + the sequence they were written rather than the sequence they appear in + the file. This makes for inefficient, fragmented manifests. Attempt + to optimize by repacking writes in file sequence. """ segs = self._segments - # Sum up the segments to get the total bytes of the file referencing - # into the buffer block. + # Collect the segments that reference the buffer block. bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid] + + # Collect total data referenced by segments (could be smaller than + # bufferblock size if a portion of the file was written and + # then overwritten). write_total = sum([s.range_size for s in bufferblock_segs]) - if write_total < self._current_bblock.size(): - # There is more data in the buffer block than is actually accounted for by segments, so - # re-pack into a new buffer by copying over to a new buffer block. + if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1: + # If there's more than one segment referencing this block, it is + # due to out-of-order writes and will produce a fragmented + # manifest, so try to optimize by re-packing into a new buffer. contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries) new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self) for t in bufferblock_segs: @@ -973,11 +1052,13 @@ class ArvadosFile(object): necessary. """ + if not isinstance(data, bytes) and not isinstance(data, memoryview): + data = data.encode() if len(data) == 0: return if offset > self.size(): - raise ArgumentError("Offset is past the end of the file") + self.truncate(offset) if len(data) > config.KEEP_BLOCK_SIZE: # Chunk it up into smaller writes @@ -988,7 +1069,7 @@ class ArvadosFile(object): n += config.KEEP_BLOCK_SIZE return - self._committed = False + self.set_committed(False) if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) @@ -1051,7 +1132,7 @@ class ArvadosFile(object): def _add_segment(self, blocks, pos, size): """Internal implementation of add_segment.""" - self._committed = False + self.set_committed(False) for lr in locators_and_ranges(blocks, pos, size): last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) @@ -1067,12 +1148,15 @@ class ArvadosFile(object): return 0 @synchronized - def manifest_text(self, stream_name=".", portable_locators=False, normalize=False): + def manifest_text(self, stream_name=".", portable_locators=False, + normalize=False, only_committed=False): buf = "" filestream = [] for segment in self.segments: loc = segment.locator - if loc.startswith("bufferblock"): + if self.parent._my_block_manager().is_bufferblock(loc): + if only_committed: + continue loc = self._bufferblocks[loc].calculate_locator() if portable_locators: loc = KeepLocator(loc).stripped() @@ -1085,7 +1169,7 @@ class ArvadosFile(object): @must_be_writable @synchronized def _reparent(self, newparent, newname): - self._committed = False + self.set_committed(False) self.flush(sync=True) self.parent.remove(self.name) self.parent = newparent @@ -1101,8 +1185,8 @@ class ArvadosFileReader(ArvadosFileReaderBase): """ - def __init__(self, arvadosfile, num_retries=None): - super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries) + def __init__(self, arvadosfile, mode="r", num_retries=None): + super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries) self.arvadosfile = arvadosfile def size(self): @@ -1126,7 +1210,7 @@ class ArvadosFileReader(ArvadosFileReaderBase): data.append(rd) self._filepos += len(rd) rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) - return ''.join(data) + return b''.join(data) else: data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True) self._filepos += len(data) @@ -1154,10 +1238,12 @@ class ArvadosFileWriter(ArvadosFileReader): """ def __init__(self, arvadosfile, mode, num_retries=None): - super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries) - self.mode = mode + super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries) self.arvadosfile.add_writer(self) + def writable(self): + return True + @_FileLikeObjectBase._before_close @retry_method def write(self, data, num_retries=None): @@ -1179,8 +1265,6 @@ class ArvadosFileWriter(ArvadosFileReader): if size is None: size = self._filepos self.arvadosfile.truncate(size) - if self._filepos > self.size(): - self._filepos = self.size() @_FileLikeObjectBase._before_close def flush(self):