- # If we don't limit the Queue size, the upload queue can quickly
- # grow to take up gigabytes of RAM if the writing process is
- # generating data more quickly than it can be send to the Keep
- # servers.
- #
- # With two upload threads and a queue size of 2, this means up to 4
- # blocks pending. If they are full 64 MiB blocks, that means up to
- # 256 MiB of internal buffering, which is the same size as the
- # default download block cache in KeepClient.
- self._put_queue = Queue.Queue(maxsize=2)
- self._put_errors = Queue.Queue()
- self._put_threads = [threading.Thread(target=worker, args=(self,)),
- threading.Thread(target=worker, args=(self,))]
- for t in self._put_threads:
- t.daemon = True
- t.start()
-
- # Mark the block as PENDING so to disallow any more appends.
- block.state = BufferBlock.PENDING
- self._put_queue.put(block)
-
- def get_block(self, locator, num_retries, cache_only=False):
- '''
- Fetch a block. First checks to see if the locator is a BufferBlock and
- return that, if not, passes the request through to KeepClient.get().
- '''
- if locator in self._bufferblocks:
- bb = self._bufferblocks[locator]
- if bb.state != BufferBlock.COMMITTED:
- return bb.buffer_view[0:bb.write_pointer].tobytes()
+ def commit_bufferblock(self, block, sync):
+ """Initiate a background upload of a bufferblock.
+
+ :block:
+ The block object to upload
+
+ :sync:
+ If `sync` is True, upload the block synchronously.
+ If `sync` is False, upload the block asynchronously. This will
+ return immediately unless the upload queue is at capacity, in
+ which case it will wait on an upload queue slot.
+
+ """
+
+ try:
+ # Mark the block as PENDING so to disallow any more appends.
+ block.set_state(_BufferBlock.PENDING)
+ except StateChangeError as e:
+ if e.state == _BufferBlock.PENDING:
+ if sync:
+ block.wait_for_commit.wait()
+ else:
+ return
+ if block.state() == _BufferBlock.COMMITTED:
+ return
+ elif block.state() == _BufferBlock.ERROR:
+ raise block.error