cache_pos, cache_data = self._readline_cache
if self.tell() == cache_pos:
data = [cache_data]
+ self._filepos += len(cache_data)
else:
data = ['']
data_size = len(data[-1])
except ValueError:
nextline_index = len(data)
nextline_index = min(nextline_index, size)
+ self._filepos -= len(data) - nextline_index
self._readline_cache = (self.tell(), data[nextline_index:])
return data[:nextline_index]
DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
- def __init__(self, keep):
+ def __init__(self, keep, copies=None):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = {}
self.prefetch_enabled = True
self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+ self.copies = copies
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
if bufferblock is None:
return
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+ if self.copies is None:
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+ else:
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
if sync:
try:
- loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+ if self.copies is None:
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+ else:
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
block.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
block.set_state(_BufferBlock.ERROR, e)