X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e21c8f8c0ba6f2012bc4108f8fa27411ab4375d1..95fc98726d64c71ed0b3a8c2270ee62c1c5d1bb5:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index a043bee751..7ee028db2a 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -10,6 +10,8 @@ import copy import errno import re import logging +import collections +import uuid from .errors import KeepWriteError, AssertionError, ArgumentError from .keep import KeepLocator @@ -405,7 +407,7 @@ class _BlockManager(object): def __init__(self, keep, copies=None): """keep: KeepClient object to use""" self._keep = keep - self._bufferblocks = {} + self._bufferblocks = collections.OrderedDict() self._put_queue = None self._put_threads = None self._prefetch_queue = None @@ -415,6 +417,8 @@ class _BlockManager(object): self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS self.copies = copies + self._pending_write_size = 0 + self.threads_lock = threading.Lock() @synchronized def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): @@ -430,8 +434,11 @@ class _BlockManager(object): ArvadosFile that owns this block """ + return self._alloc_bufferblock(blockid, starting_capacity, owner) + + def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): if blockid is None: - blockid = "bufferblock%i" % len(self._bufferblocks) + blockid = "%s" % uuid.uuid4() bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner) self._bufferblocks[bufferblock.blockid] = bufferblock return bufferblock @@ -477,28 +484,28 @@ class _BlockManager(object): if self._put_queue is not None: self._put_queue.task_done() - @synchronized def start_put_threads(self): - if self._put_threads is None: - # Start uploader threads. - - # If we don't limit the Queue size, the upload queue can quickly - # grow to take up gigabytes of RAM if the writing process is - # generating data more quickly than it can be send to the Keep - # servers. - # - # With two upload threads and a queue size of 2, this means up to 4 - # blocks pending. If they are full 64 MiB blocks, that means up to - # 256 MiB of internal buffering, which is the same size as the - # default download block cache in KeepClient. - self._put_queue = Queue.Queue(maxsize=2) - - self._put_threads = [] - for i in xrange(0, self.num_put_threads): - thread = threading.Thread(target=self._commit_bufferblock_worker) - self._put_threads.append(thread) - thread.daemon = True - thread.start() + with self.threads_lock: + if self._put_threads is None: + # Start uploader threads. + + # If we don't limit the Queue size, the upload queue can quickly + # grow to take up gigabytes of RAM if the writing process is + # generating data more quickly than it can be send to the Keep + # servers. + # + # With two upload threads and a queue size of 2, this means up to 4 + # blocks pending. If they are full 64 MiB blocks, that means up to + # 256 MiB of internal buffering, which is the same size as the + # default download block cache in KeepClient. + self._put_queue = Queue.Queue(maxsize=2) + + self._put_threads = [] + for i in xrange(0, self.num_put_threads): + thread = threading.Thread(target=self._commit_bufferblock_worker) + self._put_threads.append(thread) + thread.daemon = True + thread.start() def _block_prefetch_worker(self): """The background downloader thread.""" @@ -549,28 +556,42 @@ class _BlockManager(object): def __exit__(self, exc_type, exc_value, traceback): self.stop_threads() - def repack_small_blocks(self, force=False, sync=False): + @synchronized + def repack_small_blocks(self, force=False, sync=False, closed_file_size=0): """Packs small blocks together before uploading""" - # Search blocks ready for getting packed together before being committed to Keep - small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner and b.owner.closed() and b.owner.size() <= (config.KEEP_BLOCK_SIZE / 2)] - if len(small_blocks) <= 1: - # Not enough small blocks for repacking - return + self._pending_write_size += closed_file_size # Check if there are enough small blocks for filling up one in full - pending_write_size = sum([b.size() for b in small_blocks]) - if force or (pending_write_size >= config.KEEP_BLOCK_SIZE): - new_bb = _BufferBlock("bufferblock%i" % len(self._bufferblocks), 2**14, None) - self._bufferblocks[new_bb.blockid] = new_bb - size = 0 - while len(small_blocks) > 0 and (size + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: + if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE): + + # Search blocks ready for getting packed together before being committed to Keep. + # A WRITABLE block always has an owner. + # A WRITABLE block with its owner.closed() implies that it's + # size is <= KEEP_BLOCK_SIZE/2. + small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] + + if len(small_blocks) <= 1: + # Not enough small blocks for repacking + return + + # Update the pending write size count with its true value, just in case + # some small file was opened, written and closed several times. + if not force: + self._pending_write_size = sum([b.size() for b in small_blocks]) + if self._pending_write_size < config.KEEP_BLOCK_SIZE: + return + + new_bb = self._alloc_bufferblock() + while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: bb = small_blocks.pop(0) - size += bb.size() arvfile = bb.owner + self._pending_write_size -= bb.size() new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) - arvfile.set_segments([Range(new_bb.blockid, 0, bb.size(), size-bb.size())]) - bb.clear() - del self._bufferblocks[bb.blockid] + arvfile.set_segments([Range(new_bb.blockid, + 0, + bb.size(), + new_bb.write_pointer - bb.size())]) + self._delete_bufferblock(bb.blockid) self.commit_bufferblock(new_bb, sync=sync) def commit_bufferblock(self, block, sync): @@ -622,6 +643,9 @@ class _BlockManager(object): @synchronized def delete_bufferblock(self, locator): + self._delete_bufferblock(locator) + + def _delete_bufferblock(self, locator): bb = self._bufferblocks[locator] bb.clear() del self._bufferblocks[locator] @@ -652,8 +676,9 @@ class _BlockManager(object): are uploaded. Raises KeepWriteError() if any blocks failed to upload. """ + self.repack_small_blocks(force=True, sync=True) + with self.lock: - self.repack_small_blocks(force=True, sync=True) items = self._bufferblocks.items() for k,v in items: @@ -822,19 +847,19 @@ class ArvadosFile(object): self._writers.add(writer) @synchronized - def remove_writer(self, writer): + def remove_writer(self, writer, flush): """ Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks. """ self._writers.remove(writer) - if self.size() > config.KEEP_BLOCK_SIZE / 2: + if flush or self.size() > config.KEEP_BLOCK_SIZE / 2: # File writer closed, not small enough for repacking self.flush() elif self.closed(): # All writers closed and size is adequate for repacking - self.parent._my_block_manager().repack_small_blocks() + self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) def closed(self): """ @@ -1154,7 +1179,7 @@ class ArvadosFileWriter(ArvadosFileReader): def flush(self): self.arvadosfile.flush() - def close(self): + def close(self, flush=True): if not self.closed: - self.arvadosfile.remove_writer(self) + self.arvadosfile.remove_writer(self, flush) super(ArvadosFileWriter, self).close()