+class BufferBlock(object):
+ WRITABLE = 0
+ PENDING = 1
+ COMMITTED = 2
+
+ def __init__(self, blockid, starting_size):
+ self.blockid = blockid
+ self.buffer_block = bytearray(starting_size)
+ self.buffer_view = memoryview(self.buffer_block)
+ self.write_pointer = 0
+ self.state = BufferBlock.WRITABLE
+ self._locator = None
+
+ def append(self, data):
+ if self.state == BufferBlock.WRITABLE:
+ while (self.write_pointer+len(data)) > len(self.buffer_block):
+ new_buffer_block = bytearray(len(self.buffer_block) * 2)
+ new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
+ self.buffer_block = new_buffer_block
+ self.buffer_view = memoryview(self.buffer_block)
+ self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
+ self.write_pointer += len(data)
+ self._locator = None
+ else:
+ raise AssertionError("Buffer block is not writable")
+
+ def size(self):
+ return self.write_pointer
+
+ def locator(self):
+ if self._locator is None:
+ self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
+ return self._locator
+
+class AsyncKeepWriteErrors(Exception):
+ def __init__(self, errors):
+ self.errors = errors
+
+ def __repr__(self):
+ return "\n".join(self.errors)
+
+class BlockManager(object):
+ def __init__(self, keep):
+ self._keep = keep
+ self._bufferblocks = {}
+ self._put_queue = None
+ self._put_errors = None
+ self._put_threads = None
+ self._prefetch_queue = None
+ self._prefetch_threads = None
+
+ def alloc_bufferblock(self, blockid=None, starting_size=2**14):
+ if blockid is None:
+ blockid = "bufferblock%i" % len(self._bufferblocks)
+ bb = BufferBlock(blockid, starting_size=starting_size)
+ self._bufferblocks[bb.blockid] = bb
+ return bb
+
+ def stop_threads(self):
+ if self._put_threads is not None:
+ for t in self._put_threads:
+ self._put_queue.put(None)
+ for t in self._put_threads:
+ t.join()
+ self._put_threads = None
+ self._put_queue = None
+ self._put_errors = None
+
+ if self._prefetch_threads is not None:
+ for t in self._prefetch_threads:
+ self._prefetch_queue.put(None)
+ for t in self._prefetch_threads:
+ t.join()
+ self._prefetch_threads = None
+ self._prefetch_queue = None
+
+ def commit_bufferblock(self, block):
+ def worker(self):
+ while True:
+ try:
+ b = self._put_queue.get()
+ if b is None:
+ return
+ b._locator = self._keep.put(b.buffer_view[0:b.write_pointer].tobytes())
+ b.state = BufferBlock.COMMITTED
+ b.buffer_view = None
+ b.buffer_block = None
+ except Exception as e:
+ print e
+ self._put_errors.put(e)
+ finally:
+ if self._put_queue is not None:
+ self._put_queue.task_done()
+
+ if self._put_threads is None:
+ self._put_queue = Queue.Queue(maxsize=2)
+ self._put_errors = Queue.Queue()
+ self._put_threads = [threading.Thread(target=worker, args=(self,)), threading.Thread(target=worker, args=(self,))]
+ for t in self._put_threads:
+ t.start()
+
+ block.state = BufferBlock.PENDING
+ self._put_queue.put(block)
+
+ def get_block(self, locator, num_retries):
+ if locator in self._bufferblocks:
+ bb = self._bufferblocks[locator]
+ if bb.state != BufferBlock.COMMITTED:
+ return bb.buffer_view[0:bb.write_pointer].tobytes()
+ else:
+ locator = bb._locator
+ return self._keep.get(locator, num_retries=num_retries)
+
+ def commit_all(self):
+ for k,v in self._bufferblocks.items():
+ if v.state == BufferBlock.WRITABLE:
+ self.commit_bufferblock(v)
+ if self._put_queue is not None:
+ self._put_queue.join()
+ if not self._put_errors.empty():
+ e = []
+ try:
+ while True:
+ e.append(self._put_errors.get(False))
+ except Queue.Empty:
+ pass
+ raise AsyncKeepWriteErrors(e)
+
+ def block_prefetch(self, locator):
+ def worker(self):
+ while True:
+ try:
+ b = self._prefetch_queue.get()
+ if b is None:
+ return
+ self._keep.get(b)
+ except:
+ pass
+
+ if locator in self._bufferblocks:
+ return
+ if self._prefetch_threads is None:
+ self._prefetch_queue = Queue.Queue()
+ self._prefetch_threads = [threading.Thread(target=worker, args=(self,)),
+ threading.Thread(target=worker, args=(self,))]
+ self._prefetch_threads[0].start()
+ self._prefetch_threads[1].start()
+ self._prefetch_queue.put(locator)
+