+ def worker(self):
+ '''
+ Background uploader thread.
+ '''
+ while True:
+ try:
+ b = self._put_queue.get()
+ if b is None:
+ return
+ b._locator = self._keep.put(b.buffer_view[0:b.write_pointer].tobytes())
+ b.state = BufferBlock.COMMITTED
+ b.buffer_view = None
+ b.buffer_block = None
+ except Exception as e:
+ print e
+ self._put_errors.put(e)
+ finally:
+ if self._put_queue is not None:
+ self._put_queue.task_done()
+
+ if self._put_threads is None:
+ # Start uploader threads.
+
+ # If we don't limit the Queue size, the upload queue can quickly
+ # grow to take up gigabytes of RAM if the writing process is
+ # generating data more quickly than it can be send to the Keep
+ # servers.
+ #
+ # With two upload threads and a queue size of 2, this means up to 4
+ # blocks pending. If they are full 64 MiB blocks, that means up to
+ # 256 MiB of internal buffering, which is the same size as the
+ # default download block cache in KeepClient.
+ self._put_queue = Queue.Queue(maxsize=2)
+ self._put_errors = Queue.Queue()
+ self._put_threads = [threading.Thread(target=worker, args=(self,)),
+ threading.Thread(target=worker, args=(self,))]
+ for t in self._put_threads:
+ t.daemon = True
+ t.start()
+
+ # Mark the block as PENDING so to disallow any more appends.
+ block.state = BufferBlock.PENDING
+ self._put_queue.put(block)
+
+ def get_block(self, locator, num_retries, cache_only=False):
+ '''
+ Fetch a block. First checks to see if the locator is a BufferBlock and
+ return that, if not, passes the request through to KeepClient.get().
+ '''
+ if locator in self._bufferblocks:
+ bb = self._bufferblocks[locator]
+ if bb.state != BufferBlock.COMMITTED:
+ return bb.buffer_view[0:bb.write_pointer].tobytes()
+ else:
+ locator = bb._locator
+ return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only)
+
+ def commit_all(self):
+ '''
+ Commit all outstanding buffer blocks. Unlike commit_bufferblock(), this
+ is a synchronous call, and will not return until all buffer blocks are
+ uploaded. Raises AsyncKeepWriteErrors() if any blocks failed to
+ upload.
+ '''
+ for k,v in self._bufferblocks.items():
+ if v.state == BufferBlock.WRITABLE:
+ self.commit_bufferblock(v)
+ if self._put_queue is not None:
+ self._put_queue.join()
+ if not self._put_errors.empty():
+ e = []
+ try:
+ while True:
+ e.append(self._put_errors.get(False))
+ except Queue.Empty:
+ pass
+ raise AsyncKeepWriteErrors(e)
+
+ def block_prefetch(self, locator):
+ '''
+ Initiate a background download of a block. This assumes that the
+ underlying KeepClient implements a block cache, so repeated requests
+ for the same block will not result in repeated downloads (unless the
+ block is evicted from the cache.) This method does not block.
+ '''
+ def worker(self):
+ '''Background downloader thread.'''
+ while True:
+ try:
+ b = self._prefetch_queue.get()
+ if b is None:
+ return
+ self._keep.get(b)
+ except:
+ pass
+
+ if locator in self._bufferblocks:
+ return
+ if self._prefetch_threads is None:
+ self._prefetch_queue = Queue.Queue()
+ self._prefetch_threads = [threading.Thread(target=worker, args=(self,)),
+ threading.Thread(target=worker, args=(self,))]
+ for t in self._prefetch_threads:
+ t.daemon = True
+ t.start()
+ self._prefetch_queue.put(locator)