- def commit_bufferblock(self, block):
- '''
- Initiate a background upload of a bufferblock. This will block if the
- upload queue is at capacity, otherwise it will return immediately.
- '''
-
- def worker(self):
- '''
- Background uploader thread.
- '''
- while True:
- try:
- b = self._put_queue.get()
- if b is None:
- return
- b._locator = self._keep.put(b.buffer_view[0:b.write_pointer].tobytes())
- b.state = BufferBlock.COMMITTED
- b.buffer_view = None
- b.buffer_block = None
- except Exception as e:
- print e
- self._put_errors.put(e)
- finally:
- if self._put_queue is not None:
- self._put_queue.task_done()
-
- if self._put_threads is None:
- # Start uploader threads.
-
- # If we don't limit the Queue size, the upload queue can quickly
- # grow to take up gigabytes of RAM if the writing process is
- # generating data more quickly than it can be send to the Keep
- # servers.
- #
- # With two upload threads and a queue size of 2, this means up to 4
- # blocks pending. If they are full 64 MiB blocks, that means up to
- # 256 MiB of internal buffering, which is the same size as the
- # default download block cache in KeepClient.
- self._put_queue = Queue.Queue(maxsize=2)
- self._put_errors = Queue.Queue()
- self._put_threads = [threading.Thread(target=worker, args=(self,)),
- threading.Thread(target=worker, args=(self,))]
- for t in self._put_threads:
- t.daemon = True
- t.start()
-
- # Mark the block as PENDING so to disallow any more appends.
- block.state = BufferBlock.PENDING
- self._put_queue.put(block)
-
- def get_block(self, locator, num_retries, cache_only=False):
- '''
- Fetch a block. First checks to see if the locator is a BufferBlock and
- return that, if not, passes the request through to KeepClient.get().
- '''
- if locator in self._bufferblocks:
- bb = self._bufferblocks[locator]
- if bb.state != BufferBlock.COMMITTED:
- return bb.buffer_view[0:bb.write_pointer].tobytes()
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.stop_threads()
+
+ @synchronized
+ def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
+ """Packs small blocks together before uploading"""
+
+ self._pending_write_size += closed_file_size
+
+ # Check if there are enough small blocks for filling up one in full
+ if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
+ return
+
+ # Search blocks ready for getting packed together before being
+ # committed to Keep.
+ # A WRITABLE block always has an owner.
+ # A WRITABLE block with its owner.closed() implies that its
+ # size is <= KEEP_BLOCK_SIZE/2.
+ try:
+ small_blocks = [b for b in listvalues(self._bufferblocks)
+ if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+ except AttributeError:
+ # Writable blocks without owner shouldn't exist.
+ raise UnownedBlockError()
+
+ if len(small_blocks) <= 1:
+ # Not enough small blocks for repacking
+ return
+
+ for bb in small_blocks:
+ bb.repack_writes()
+
+ # Update the pending write size count with its true value, just in case
+ # some small file was opened, written and closed several times.
+ self._pending_write_size = sum([b.size() for b in small_blocks])
+
+ if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+ return
+
+ new_bb = self._alloc_bufferblock()
+ new_bb.owner = []
+ files = []
+ while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+ bb = small_blocks.pop(0)
+ new_bb.owner.append(bb.owner)
+ self._pending_write_size -= bb.size()
+ new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
+ files.append((bb, new_bb.write_pointer - bb.size()))
+
+ self.commit_bufferblock(new_bb, sync=sync)
+
+ for bb, new_bb_segment_offset in files:
+ newsegs = bb.owner.segments()
+ for s in newsegs:
+ if s.locator == bb.blockid:
+ s.locator = new_bb.blockid
+ s.segment_offset = new_bb_segment_offset+s.segment_offset
+ bb.owner.set_segments(newsegs)
+ self._delete_bufferblock(bb.blockid)
+
+ def commit_bufferblock(self, block, sync):
+ """Initiate a background upload of a bufferblock.
+
+ :block:
+ The block object to upload
+
+ :sync:
+ If `sync` is True, upload the block synchronously.
+ If `sync` is False, upload the block asynchronously. This will
+ return immediately unless the upload queue is at capacity, in
+ which case it will wait on an upload queue slot.
+
+ """
+ try:
+ # Mark the block as PENDING so to disallow any more appends.
+ block.set_state(_BufferBlock.PENDING)
+ except StateChangeError as e:
+ if e.state == _BufferBlock.PENDING:
+ if sync:
+ block.wait_for_commit.wait()
+ else:
+ return
+ if block.state() == _BufferBlock.COMMITTED:
+ return
+ elif block.state() == _BufferBlock.ERROR:
+ raise block.error