-
-
-class BufferBlock(object):
- def __init__(self, locator, streamoffset):
- self.locator = locator
- self.buffer_block = bytearray(config.KEEP_BLOCK_SIZE)
- self.buffer_view = memoryview(self.buffer_block)
- self.write_pointer = 0
- self.locator_list_entry = [locator, 0, streamoffset]
-
- def append(self, data):
- self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
- self.write_pointer += len(data)
- self.locator_list_entry[1] = self.write_pointer
-
-
-class StreamWriter(StreamReader):
- def __init__(self, tokens, keep=None, debug=False, _empty=False,
- num_retries=0):
- super(StreamWriter, self).__init__(tokens, keep, debug, _empty, num_retries)
-
- if len(self._files) != 1:
- raise AssertionError("StreamWriter can only have one file at a time")
- sr = self._files.popitem()[1]
- self._files[sr.name] = StreamFileWriter(self, sr.segments, sr.name)
-
- self.mutex = threading.Lock()
- self.current_bblock = None
- self.bufferblocks = {}
-
- # wrap superclass methods in mutex
- def _proxy_method(name):
- method = getattr(StreamReader, name)
- @functools.wraps(method, ('__name__', '__doc__'))
- def wrapper(self, *args, **kwargs):
- with self.mutex:
- return method(self, *args, **kwargs)
- return wrapper
-
- for _method_name in ['files', 'all_files', 'size', 'locators_and_ranges', 'readfrom', 'manifest_text']:
- locals()[_method_name] = _proxy_method(_method_name)
-
- @retry_method
- def _keepget(self, locator, num_retries=None):
- if locator in self.bufferblocks:
- bb = self.bufferblocks[locator]
- return str(bb.buffer_block[0:bb.write_pointer])
- else:
- return self._keep.get(locator, num_retries=num_retries)
-
- def _append(self, data):
- if self.current_bblock is None:
- last = self._data_locators[-1]
- streamoffset = last[OFFSET] + last[BLOCKSIZE]
- self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset)
- self.bufferblocks[self.current_bblock.locator] = self.current_bblock
- self._data_locators.append(self.current_bblock.locator_list_entry)
- self.current_bblock.append(data)
-
- def append(self, data):
- with self.mutex:
- self._append(data)