DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
- def __init__(self, keep):
+ def __init__(self, keep, copies=None):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = {}
self.prefetch_enabled = True
self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+ self.copies = copies
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
if bufferblock is None:
return
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+ if self.copies is None:
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+ else:
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
if sync:
try:
- loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+ if self.copies is None:
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+ else:
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
block.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
block.set_state(_BufferBlock.ERROR, e)