-
-
-class BufferBlock(object):
- def __init__(self, locator, streamoffset, starting_size=2**16):
- self.locator = locator
- self.buffer_block = bytearray(starting_size)
- self.buffer_view = memoryview(self.buffer_block)
- self.write_pointer = 0
- self.locator_list_entry = [locator, 0, streamoffset]
-
- def append(self, data):
- while (self.write_pointer+len(data)) > len(self.buffer_block):
- new_buffer_block = bytearray(len(self.buffer_block) * 2)
- new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
- self.buffer_block = new_buffer_block
- self.buffer_view = memoryview(self.buffer_block)
- self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
- self.write_pointer += len(data)
- self.locator_list_entry[1] = self.write_pointer
-
-
-class StreamWriter(StreamReader):
- def __init__(self, tokens, keep=None, debug=False, _empty=False,
- num_retries=0):
- super(StreamWriter, self).__init__(tokens, keep, debug, _empty, num_retries)
-
- if len(self._files) != 1:
- raise AssertionError("StreamWriter can only have one file at a time")
- sr = self._files.popitem()[1]
- self._files[sr.name] = StreamFileWriter(self, sr.segments, sr.name)
-
- self.mutex = threading.Lock()
- self.current_bblock = None
- self.bufferblocks = {}
-
- # wrap superclass methods in mutex
- def _proxy_method(name):
- method = getattr(StreamReader, name)
- @functools.wraps(method, ('__name__', '__doc__'))
- def wrapper(self, *args, **kwargs):
- with self.mutex:
- return method(self, *args, **kwargs)
- return wrapper
-
- for _method_name in ['files', 'all_files', 'size', 'locators_and_ranges', 'readfrom', 'manifest_text']:
- locals()[_method_name] = _proxy_method(_method_name)
-
- @retry_method
- def _keepget(self, locator, num_retries=None):
- if locator in self.bufferblocks:
- bb = self.bufferblocks[locator]
- return str(bb.buffer_block[0:bb.write_pointer])
- else:
- return self._keep.get(locator, num_retries=num_retries)
-
- def _init_bufferblock(self):
- last = self._data_locators[-1]
- streamoffset = last[OFFSET] + last[BLOCKSIZE]
- if last[BLOCKSIZE] == 0:
- del self._data_locators[-1]
- self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset)
- self.bufferblocks[self.current_bblock.locator] = self.current_bblock
- self._data_locators.append(self.current_bblock.locator_list_entry)
-
- def _commit(self):
- # commit buffer block
-
- segs = self._files.values()[0].segments
- print "segs %s bb %s" % (segs, self.current_bblock.locator_list_entry)
- final_writes = [s for s in segs if s[LOCATOR] >= self.current_bblock.locator_list_entry[OFFSET]]
- print "final_writes %s" % final_writes
- # if size of final_writes < size of buffer block ...
-
- # TODO: do 'put' in the background?
- pdh = self._keep.put(self.current_bblock.buffer_block[0:self.current_bblock.write_pointer])
- self._data_locators[-1][0] = pdh
- self.current_bblock = None
-
- def commit(self):
- with self.mutex:
- self._commit()
-
- def _append(self, data):
- if len(data) > config.KEEP_BLOCK_SIZE:
- raise ArgumentError("Please append data chunks smaller than config.KEEP_BLOCK_SIZE")
-
- if self.current_bblock is None:
- self._init_bufferblock()
-
- if (self.current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
- self._commit()
- self._init_bufferblock()
-
- self.current_bblock.append(data)
-
- def append(self, data):
- with self.mutex:
- self._append(data)