X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0c0f18dfbcdcf552889258b76563315fbe2eb060..5c05b488a1f2318e73780b67f6f654b4a12c32b3:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 71af6445a5..610fd7dc13 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -10,6 +10,8 @@ import copy import errno import re import logging +import collections +import uuid from .errors import KeepWriteError, AssertionError, ArgumentError from .keep import KeepLocator @@ -108,6 +110,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): cache_pos, cache_data = self._readline_cache if self.tell() == cache_pos: data = [cache_data] + self._filepos += len(cache_data) else: data = [''] data_size = len(data[-1]) @@ -123,6 +126,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): except ValueError: nextline_index = len(data) nextline_index = min(nextline_index, size) + self._filepos -= len(data) - nextline_index self._readline_cache = (self.tell(), data[nextline_index:]) return data[:nextline_index] @@ -400,10 +404,10 @@ class _BlockManager(object): DEFAULT_PUT_THREADS = 2 DEFAULT_GET_THREADS = 2 - def __init__(self, keep): + def __init__(self, keep, copies=None): """keep: KeepClient object to use""" self._keep = keep - self._bufferblocks = {} + self._bufferblocks = collections.OrderedDict() self._put_queue = None self._put_threads = None self._prefetch_queue = None @@ -412,6 +416,9 @@ class _BlockManager(object): self.prefetch_enabled = True self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS + self.copies = copies + self._pending_write_size = 0 + self.threads_lock = threading.Lock() @synchronized def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): @@ -427,8 +434,11 @@ class _BlockManager(object): ArvadosFile that owns this block """ + return self._alloc_bufferblock(blockid, starting_capacity, owner) + + def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): if blockid is None: - blockid = "bufferblock%i" % len(self._bufferblocks) + blockid = "%s" % uuid.uuid4() bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner) self._bufferblocks[bufferblock.blockid] = bufferblock return bufferblock @@ -462,7 +472,10 @@ class _BlockManager(object): if bufferblock is None: return - loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) + if self.copies is None: + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) + else: + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies) bufferblock.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: @@ -471,28 +484,28 @@ class _BlockManager(object): if self._put_queue is not None: self._put_queue.task_done() - @synchronized def start_put_threads(self): - if self._put_threads is None: - # Start uploader threads. - - # If we don't limit the Queue size, the upload queue can quickly - # grow to take up gigabytes of RAM if the writing process is - # generating data more quickly than it can be send to the Keep - # servers. - # - # With two upload threads and a queue size of 2, this means up to 4 - # blocks pending. If they are full 64 MiB blocks, that means up to - # 256 MiB of internal buffering, which is the same size as the - # default download block cache in KeepClient. - self._put_queue = Queue.Queue(maxsize=2) - - self._put_threads = [] - for i in xrange(0, self.num_put_threads): - thread = threading.Thread(target=self._commit_bufferblock_worker) - self._put_threads.append(thread) - thread.daemon = True - thread.start() + with self.threads_lock: + if self._put_threads is None: + # Start uploader threads. + + # If we don't limit the Queue size, the upload queue can quickly + # grow to take up gigabytes of RAM if the writing process is + # generating data more quickly than it can be send to the Keep + # servers. + # + # With two upload threads and a queue size of 2, this means up to 4 + # blocks pending. If they are full 64 MiB blocks, that means up to + # 256 MiB of internal buffering, which is the same size as the + # default download block cache in KeepClient. + self._put_queue = Queue.Queue(maxsize=2) + + self._put_threads = [] + for i in xrange(0, self.num_put_threads): + thread = threading.Thread(target=self._commit_bufferblock_worker) + self._put_threads.append(thread) + thread.daemon = True + thread.start() def _block_prefetch_worker(self): """The background downloader thread.""" @@ -543,6 +556,43 @@ class _BlockManager(object): def __exit__(self, exc_type, exc_value, traceback): self.stop_threads() + @synchronized + def repack_small_blocks(self, force=False, sync=False, closed_file_size=0): + """Packs small blocks together before uploading""" + self._pending_write_size += closed_file_size + + # Check if there are enough small blocks for filling up one in full + if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE): + + # Search blocks ready for getting packed together before being committed to Keep. + # A WRITABLE block always has an owner. + # A WRITABLE block with its owner.closed() implies that it's + # size is <= KEEP_BLOCK_SIZE/2. + small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] + + if len(small_blocks) <= 1: + # Not enough small blocks for repacking + return + + # Update the pending write size count with its true value, just in case + # some small file was opened, written and closed several times. + self._pending_write_size = sum([b.size() for b in small_blocks]) + if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force: + return + + new_bb = self._alloc_bufferblock() + while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: + bb = small_blocks.pop(0) + arvfile = bb.owner + self._pending_write_size -= bb.size() + new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) + arvfile.set_segments([Range(new_bb.blockid, + 0, + bb.size(), + new_bb.write_pointer - bb.size())]) + self._delete_bufferblock(bb.blockid) + self.commit_bufferblock(new_bb, sync=sync) + def commit_bufferblock(self, block, sync): """Initiate a background upload of a bufferblock. @@ -556,7 +606,6 @@ class _BlockManager(object): which case it will wait on an upload queue slot. """ - try: # Mark the block as PENDING so to disallow any more appends. block.set_state(_BufferBlock.PENDING) @@ -575,7 +624,10 @@ class _BlockManager(object): if sync: try: - loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) + if self.copies is None: + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) + else: + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies) block.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: block.set_state(_BufferBlock.ERROR, e) @@ -590,6 +642,9 @@ class _BlockManager(object): @synchronized def delete_bufferblock(self, locator): + self._delete_bufferblock(locator) + + def _delete_bufferblock(self, locator): bb = self._bufferblocks[locator] bb.clear() del self._bufferblocks[locator] @@ -620,11 +675,13 @@ class _BlockManager(object): are uploaded. Raises KeepWriteError() if any blocks failed to upload. """ + self.repack_small_blocks(force=True, sync=True) + with self.lock: items = self._bufferblocks.items() for k,v in items: - if v.state() != _BufferBlock.COMMITTED: + if v.state() != _BufferBlock.COMMITTED and v.owner: v.owner.flush(sync=False) with self.lock: @@ -691,6 +748,7 @@ class ArvadosFile(object): """ self.parent = parent self.name = name + self._writers = set() self._committed = False self._segments = [] self.lock = parent.root_collection().lock @@ -767,9 +825,13 @@ class ArvadosFile(object): def __ne__(self, other): return not self.__eq__(other) + @synchronized + def set_segments(self, segs): + self._segments = segs + @synchronized def set_committed(self): - """Set committed flag to False""" + """Set committed flag to True""" self._committed = True @synchronized @@ -777,6 +839,34 @@ class ArvadosFile(object): """Get whether this is committed or not.""" return self._committed + @synchronized + def add_writer(self, writer): + """Add an ArvadosFileWriter reference to the list of writers""" + if isinstance(writer, ArvadosFileWriter): + self._writers.add(writer) + + @synchronized + def remove_writer(self, writer, flush): + """ + Called from ArvadosFileWriter.close(). Remove a writer reference from the list + and do some block maintenance tasks. + """ + self._writers.remove(writer) + + if flush or self.size() > config.KEEP_BLOCK_SIZE / 2: + # File writer closed, not small enough for repacking + self.flush() + elif self.closed(): + # All writers closed and size is adequate for repacking + self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) + + def closed(self): + """ + Get whether this is closed or not. When the writers list is empty, the file + is supposed to be closed. + """ + return len(self._writers) == 0 + @must_be_writable @synchronized def truncate(self, size): @@ -1058,6 +1148,7 @@ class ArvadosFileWriter(ArvadosFileReader): def __init__(self, arvadosfile, mode, num_retries=None): super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries) self.mode = mode + self.arvadosfile.add_writer(self) @_FileLikeObjectBase._before_close @retry_method @@ -1087,7 +1178,7 @@ class ArvadosFileWriter(ArvadosFileReader): def flush(self): self.arvadosfile.flush() - def close(self): + def close(self, flush=True): if not self.closed: - self.flush() + self.arvadosfile.remove_writer(self, flush) super(ArvadosFileWriter, self).close()