+
+ @synchronized
+ def start_get_threads(self):
+ if self._prefetch_threads is None:
+ self._prefetch_queue = Queue.Queue()
+ self._prefetch_threads = []
+ for i in xrange(0, self.num_get_threads):
+ thread = threading.Thread(target=self._block_prefetch_worker)
+ self._prefetch_threads.append(thread)
+ thread.daemon = True
+ thread.start()
+
+
+ @synchronized
+ def stop_threads(self):
+ """Shut down and wait for background upload and download threads to finish."""
+
+ if self._put_threads is not None:
+ for t in self._put_threads:
+ self._put_queue.put(None)
+ for t in self._put_threads:
+ t.join()
+ self._put_threads = None
+ self._put_queue = None
+
+ if self._prefetch_threads is not None:
+ for t in self._prefetch_threads:
+ self._prefetch_queue.put(None)
+ for t in self._prefetch_threads:
+ t.join()
+ self._prefetch_threads = None
+ self._prefetch_queue = None
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.stop_threads()
+
+ def commit_bufferblock(self, block, sync):
+ """Initiate a background upload of a bufferblock.
+
+ :block:
+ The block object to upload
+
+ :sync:
+ If `sync` is True, upload the block synchronously.
+ If `sync` is False, upload the block asynchronously. This will
+ return immediately unless the upload queue is at capacity, in
+ which case it will wait on an upload queue slot.
+
+ """
+
+ try:
+ # Mark the block as PENDING so to disallow any more appends.
+ block.set_state(_BufferBlock.PENDING)
+ except StateChangeError as e:
+ if e.state == _BufferBlock.PENDING:
+ if sync:
+ block.wait_for_commit.wait()
+ else:
+ return
+ if block.state() == _BufferBlock.COMMITTED:
+ return
+ elif block.state() == _BufferBlock.ERROR:
+ raise block.error
+ else:
+ raise
+
+ if sync:
+ try:
+ if self.copies is None:
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+ else:
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
+ block.set_state(_BufferBlock.COMMITTED, loc)
+ except Exception as e:
+ block.set_state(_BufferBlock.ERROR, e)
+ raise
+ else:
+ self.start_put_threads()
+ self._put_queue.put(block)
+
+ @synchronized
+ def get_bufferblock(self, locator):
+ return self._bufferblocks.get(locator)
+
+ @synchronized
+ def delete_bufferblock(self, locator):
+ bb = self._bufferblocks[locator]
+ bb.clear()
+ del self._bufferblocks[locator]
+
+ def get_block_contents(self, locator, num_retries, cache_only=False):
+ """Fetch a block.
+
+ First checks to see if the locator is a BufferBlock and return that, if
+ not, passes the request through to KeepClient.get().
+
+ """
+ with self.lock:
+ if locator in self._bufferblocks:
+ bufferblock = self._bufferblocks[locator]
+ if bufferblock.state() != _BufferBlock.COMMITTED:
+ return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
+ else:
+ locator = bufferblock._locator
+ if cache_only:
+ return self._keep.get_from_cache(locator)
+ else:
+ return self._keep.get(locator, num_retries=num_retries)
+
+ def commit_all(self):
+ """Commit all outstanding buffer blocks.
+
+ This is a synchronous call, and will not return until all buffer blocks
+ are uploaded. Raises KeepWriteError() if any blocks failed to upload.
+
+ """
+ with self.lock:
+ items = self._bufferblocks.items()
+
+ for k,v in items:
+ if v.state() != _BufferBlock.COMMITTED:
+ v.owner.flush(sync=False)
+
+ with self.lock:
+ if self._put_queue is not None:
+ self._put_queue.join()
+
+ err = []
+ for k,v in items:
+ if v.state() == _BufferBlock.ERROR:
+ err.append((v.locator(), v.error))
+ if err:
+ raise KeepWriteError("Error writing some blocks", err, label="block")
+
+ for k,v in items:
+ # flush again with sync=True to remove committed bufferblocks from
+ # the segments.
+ if v.owner:
+ v.owner.flush(sync=True)