@synchronized
def set_state(self, nextstate, val=None):
if (self._state, nextstate) not in self.STATE_TRANSITIONS:
- raise StateChangeError("Invalid state change from %s to %s" % (self.state, nextstate), self.state, nextstate)
+ raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
self._state = nextstate
if self._state == _BufferBlock.PENDING:
for i in xrange(0, self.num_put_threads):
thread = threading.Thread(target=self._commit_bufferblock_worker)
self._put_threads.append(thread)
- thread.daemon = False
+ thread.daemon = True
thread.start()
def _block_prefetch_worker(self):
def __exit__(self, exc_type, exc_value, traceback):
self.stop_threads()
- def __del__(self):
- self.stop_threads()
-
def commit_bufferblock(self, block, sync):
"""Initiate a background upload of a bufferblock.
# Mark the block as PENDING so to disallow any more appends.
block.set_state(_BufferBlock.PENDING)
except StateChangeError as e:
- if e.state == _BufferBlock.PENDING and sync:
- block.wait_for_commit.wait()
- if block.state() == _BufferBlock.ERROR:
- raise block.error
- return
+ if e.state == _BufferBlock.PENDING:
+ if sync:
+ block.wait_for_commit.wait()
+ else:
+ return
+ if block.state() == _BufferBlock.COMMITTED:
+ return
+ elif block.state() == _BufferBlock.ERROR:
+ raise block.error
+ else:
+ raise
if sync:
try: