- def worker(self):
- """
- Background uploader thread.
- """
- while True:
- try:
- b = self._put_queue.get()
- if b is None:
- return
- b._locator = self._keep.put(b.buffer_view[0:b.write_pointer].tobytes())
- b.state = BufferBlock.COMMITTED
- b.buffer_view = None
- b.buffer_block = None
- except Exception as e:
- print e
- self._put_errors.put(e)
- finally:
- if self._put_queue is not None:
- self._put_queue.task_done()
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.stop_threads()
+
+ @synchronized
+ def repack_small_blocks(self, force=False, sync=False):
+ """Packs small blocks together before uploading"""
+ # Search blocks ready for getting packed together before being committed to Keep.
+ small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+ if len(small_blocks) <= 1:
+ # Not enough small blocks for repacking
+ return
+
+ # Check if there are enough small blocks for filling up one in full
+ pending_write_size = sum([b.size() for b in small_blocks])
+ if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
+ new_bb = _BufferBlock("bufferblock%i" % len(self._bufferblocks), 2**14, None)
+ self._bufferblocks[new_bb.blockid] = new_bb
+ size = 0
+ while len(small_blocks) > 0 and (size + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+ bb = small_blocks.pop(0)
+ size += bb.size()
+ arvfile = bb.owner
+ new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
+ arvfile.set_segments([Range(new_bb.blockid, 0, bb.size(), size-bb.size())])
+ bb.clear()
+ del self._bufferblocks[bb.blockid]
+ self.commit_bufferblock(new_bb, sync=sync)
+
+ def commit_bufferblock(self, block, sync):
+ """Initiate a background upload of a bufferblock.
+
+ :block:
+ The block object to upload
+
+ :sync:
+ If `sync` is True, upload the block synchronously.
+ If `sync` is False, upload the block asynchronously. This will
+ return immediately unless the upload queue is at capacity, in
+ which case it will wait on an upload queue slot.