X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a830b5b560251c3143a7b1fd60db3f50a7021b34..5c05b488a1f2318e73780b67f6f654b4a12c32b3:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 7d6d67654a..610fd7dc13 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -10,6 +10,8 @@ import copy import errno import re import logging +import collections +import uuid from .errors import KeepWriteError, AssertionError, ArgumentError from .keep import KeepLocator @@ -108,6 +110,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): cache_pos, cache_data = self._readline_cache if self.tell() == cache_pos: data = [cache_data] + self._filepos += len(cache_data) else: data = [''] data_size = len(data[-1]) @@ -123,13 +126,14 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): except ValueError: nextline_index = len(data) nextline_index = min(nextline_index, size) + self._filepos -= len(data) - nextline_index self._readline_cache = (self.tell(), data[nextline_index:]) return data[:nextline_index] @_FileLikeObjectBase._before_close @retry_method def decompress(self, decompress, size, num_retries=None): - for segment in self.readall(size, num_retries): + for segment in self.readall(size, num_retries=num_retries): data = decompress(segment) if data: yield data @@ -311,28 +315,30 @@ class _BufferBlock(object): else: raise AssertionError("Buffer block is not writable") + STATE_TRANSITIONS = frozenset([ + (WRITABLE, PENDING), + (PENDING, COMMITTED), + (PENDING, ERROR), + (ERROR, PENDING)]) + @synchronized def set_state(self, nextstate, val=None): - if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or - (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED) or - (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.ERROR) or - (self._state == _BufferBlock.ERROR and nextstate == _BufferBlock.PENDING)): - self._state = nextstate - - if self._state == _BufferBlock.PENDING: - self.wait_for_commit.clear() - - if self._state == _BufferBlock.COMMITTED: - self._locator = val - self.buffer_view = None - self.buffer_block = None - self.wait_for_commit.set() - - if self._state == _BufferBlock.ERROR: - self.error = val - self.wait_for_commit.set() - else: - raise StateChangeError("Invalid state change from %s to %s" % (self.state, nextstate), self.state, nextstate) + if (self._state, nextstate) not in self.STATE_TRANSITIONS: + raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate) + self._state = nextstate + + if self._state == _BufferBlock.PENDING: + self.wait_for_commit.clear() + + if self._state == _BufferBlock.COMMITTED: + self._locator = val + self.buffer_view = None + self.buffer_block = None + self.wait_for_commit.set() + + if self._state == _BufferBlock.ERROR: + self.error = val + self.wait_for_commit.set() @synchronized def state(self): @@ -398,10 +404,10 @@ class _BlockManager(object): DEFAULT_PUT_THREADS = 2 DEFAULT_GET_THREADS = 2 - def __init__(self, keep): + def __init__(self, keep, copies=None): """keep: KeepClient object to use""" self._keep = keep - self._bufferblocks = {} + self._bufferblocks = collections.OrderedDict() self._put_queue = None self._put_threads = None self._prefetch_queue = None @@ -410,6 +416,9 @@ class _BlockManager(object): self.prefetch_enabled = True self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS + self.copies = copies + self._pending_write_size = 0 + self.threads_lock = threading.Lock() @synchronized def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): @@ -425,8 +434,11 @@ class _BlockManager(object): ArvadosFile that owns this block """ + return self._alloc_bufferblock(blockid, starting_capacity, owner) + + def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): if blockid is None: - blockid = "bufferblock%i" % len(self._bufferblocks) + blockid = "%s" % uuid.uuid4() bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner) self._bufferblocks[bufferblock.blockid] = bufferblock return bufferblock @@ -460,7 +472,10 @@ class _BlockManager(object): if bufferblock is None: return - loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) + if self.copies is None: + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) + else: + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies) bufferblock.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: @@ -469,28 +484,28 @@ class _BlockManager(object): if self._put_queue is not None: self._put_queue.task_done() - @synchronized def start_put_threads(self): - if self._put_threads is None: - # Start uploader threads. - - # If we don't limit the Queue size, the upload queue can quickly - # grow to take up gigabytes of RAM if the writing process is - # generating data more quickly than it can be send to the Keep - # servers. - # - # With two upload threads and a queue size of 2, this means up to 4 - # blocks pending. If they are full 64 MiB blocks, that means up to - # 256 MiB of internal buffering, which is the same size as the - # default download block cache in KeepClient. - self._put_queue = Queue.Queue(maxsize=2) - - self._put_threads = [] - for i in xrange(0, self.num_put_threads): - thread = threading.Thread(target=self._commit_bufferblock_worker) - self._put_threads.append(thread) - thread.daemon = False - thread.start() + with self.threads_lock: + if self._put_threads is None: + # Start uploader threads. + + # If we don't limit the Queue size, the upload queue can quickly + # grow to take up gigabytes of RAM if the writing process is + # generating data more quickly than it can be send to the Keep + # servers. + # + # With two upload threads and a queue size of 2, this means up to 4 + # blocks pending. If they are full 64 MiB blocks, that means up to + # 256 MiB of internal buffering, which is the same size as the + # default download block cache in KeepClient. + self._put_queue = Queue.Queue(maxsize=2) + + self._put_threads = [] + for i in xrange(0, self.num_put_threads): + thread = threading.Thread(target=self._commit_bufferblock_worker) + self._put_threads.append(thread) + thread.daemon = True + thread.start() def _block_prefetch_worker(self): """The background downloader thread.""" @@ -541,8 +556,42 @@ class _BlockManager(object): def __exit__(self, exc_type, exc_value, traceback): self.stop_threads() - def __del__(self): - self.stop_threads() + @synchronized + def repack_small_blocks(self, force=False, sync=False, closed_file_size=0): + """Packs small blocks together before uploading""" + self._pending_write_size += closed_file_size + + # Check if there are enough small blocks for filling up one in full + if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE): + + # Search blocks ready for getting packed together before being committed to Keep. + # A WRITABLE block always has an owner. + # A WRITABLE block with its owner.closed() implies that it's + # size is <= KEEP_BLOCK_SIZE/2. + small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] + + if len(small_blocks) <= 1: + # Not enough small blocks for repacking + return + + # Update the pending write size count with its true value, just in case + # some small file was opened, written and closed several times. + self._pending_write_size = sum([b.size() for b in small_blocks]) + if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force: + return + + new_bb = self._alloc_bufferblock() + while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: + bb = small_blocks.pop(0) + arvfile = bb.owner + self._pending_write_size -= bb.size() + new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) + arvfile.set_segments([Range(new_bb.blockid, + 0, + bb.size(), + new_bb.write_pointer - bb.size())]) + self._delete_bufferblock(bb.blockid) + self.commit_bufferblock(new_bb, sync=sync) def commit_bufferblock(self, block, sync): """Initiate a background upload of a bufferblock. @@ -557,20 +606,28 @@ class _BlockManager(object): which case it will wait on an upload queue slot. """ - try: # Mark the block as PENDING so to disallow any more appends. block.set_state(_BufferBlock.PENDING) except StateChangeError as e: - if e.state == _BufferBlock.PENDING and sync: - block.wait_for_commit.wait() - if block.state() == _BufferBlock.ERROR: - raise block.error - return + if e.state == _BufferBlock.PENDING: + if sync: + block.wait_for_commit.wait() + else: + return + if block.state() == _BufferBlock.COMMITTED: + return + elif block.state() == _BufferBlock.ERROR: + raise block.error + else: + raise if sync: try: - loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) + if self.copies is None: + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) + else: + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies) block.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: block.set_state(_BufferBlock.ERROR, e) @@ -585,6 +642,9 @@ class _BlockManager(object): @synchronized def delete_bufferblock(self, locator): + self._delete_bufferblock(locator) + + def _delete_bufferblock(self, locator): bb = self._bufferblocks[locator] bb.clear() del self._bufferblocks[locator] @@ -615,11 +675,13 @@ class _BlockManager(object): are uploaded. Raises KeepWriteError() if any blocks failed to upload. """ + self.repack_small_blocks(force=True, sync=True) + with self.lock: items = self._bufferblocks.items() for k,v in items: - if v.state() != _BufferBlock.COMMITTED: + if v.state() != _BufferBlock.COMMITTED and v.owner: v.owner.flush(sync=False) with self.lock: @@ -686,6 +748,7 @@ class ArvadosFile(object): """ self.parent = parent self.name = name + self._writers = set() self._committed = False self._segments = [] self.lock = parent.root_collection().lock @@ -762,9 +825,13 @@ class ArvadosFile(object): def __ne__(self, other): return not self.__eq__(other) + @synchronized + def set_segments(self, segs): + self._segments = segs + @synchronized def set_committed(self): - """Set committed flag to False""" + """Set committed flag to True""" self._committed = True @synchronized @@ -772,6 +839,34 @@ class ArvadosFile(object): """Get whether this is committed or not.""" return self._committed + @synchronized + def add_writer(self, writer): + """Add an ArvadosFileWriter reference to the list of writers""" + if isinstance(writer, ArvadosFileWriter): + self._writers.add(writer) + + @synchronized + def remove_writer(self, writer, flush): + """ + Called from ArvadosFileWriter.close(). Remove a writer reference from the list + and do some block maintenance tasks. + """ + self._writers.remove(writer) + + if flush or self.size() > config.KEEP_BLOCK_SIZE / 2: + # File writer closed, not small enough for repacking + self.flush() + elif self.closed(): + # All writers closed and size is adequate for repacking + self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) + + def closed(self): + """ + Get whether this is closed or not. When the writers list is empty, the file + is supposed to be closed. + """ + return len(self._writers) == 0 + @must_be_writable @synchronized def truncate(self, size): @@ -927,7 +1022,7 @@ class ArvadosFile(object): bb = self.parent._my_block_manager().get_bufferblock(s.locator) if bb: if bb.state() != _BufferBlock.COMMITTED: - self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=True) + self.parent._my_block_manager().commit_bufferblock(bb, sync=True) to_delete.add(s.locator) s.locator = bb.locator() for s in to_delete: @@ -1053,6 +1148,7 @@ class ArvadosFileWriter(ArvadosFileReader): def __init__(self, arvadosfile, mode, num_retries=None): super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries) self.mode = mode + self.arvadosfile.add_writer(self) @_FileLikeObjectBase._before_close @retry_method @@ -1068,7 +1164,7 @@ class ArvadosFileWriter(ArvadosFileReader): @retry_method def writelines(self, seq, num_retries=None): for s in seq: - self.write(s, num_retries) + self.write(s, num_retries=num_retries) @_FileLikeObjectBase._before_close def truncate(self, size=None): @@ -1082,7 +1178,7 @@ class ArvadosFileWriter(ArvadosFileReader): def flush(self): self.arvadosfile.flush() - def close(self): + def close(self, flush=True): if not self.closed: - self.flush() + self.arvadosfile.remove_writer(self, flush) super(ArvadosFileWriter, self).close()