-
-
-class BufferBlock(object):
- def __init__(self, locator, streamoffset, starting_size=2**16):
- self.locator = locator
- self.buffer_block = bytearray(starting_size)
- self.buffer_view = memoryview(self.buffer_block)
- self.write_pointer = 0
- self.locator_list_entry = [locator, 0, streamoffset]
-
- def append(self, data):
- while (self.write_pointer+len(data)) > len(self.buffer_block):
- new_buffer_block = bytearray(len(self.buffer_block) * 2)
- new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
- self.buffer_block = new_buffer_block
- self.buffer_view = memoryview(self.buffer_block)
- self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
- self.write_pointer += len(data)
- self.locator_list_entry[1] = self.write_pointer
-
-
-class StreamWriter(StreamReader):
- def __init__(self, tokens, keep=None, debug=False, _empty=False,
- num_retries=0):
- super(StreamWriter, self).__init__(tokens, keep, debug, _empty, num_retries)
-
- if len(self._files) != 1:
- raise AssertionError("StreamWriter can only have one file at a time")
- sr = self._files.popitem()[1]
- self._files[sr.name] = StreamFileWriter(self, sr.segments, sr.name)
-
- self.mutex = threading.Lock()
- self.current_bblock = None
- self.bufferblocks = {}
-
- # wrap superclass methods in mutex
- def _proxy_method(name):
- method = getattr(StreamReader, name)
- @functools.wraps(method, ('__name__', '__doc__'))
- def wrapper(self, *args, **kwargs):
- with self.mutex:
- return method(self, *args, **kwargs)
- return wrapper
-
- for _method_name in ['files', 'all_files', 'size', 'locators_and_ranges', 'readfrom', 'manifest_text']:
- locals()[_method_name] = _proxy_method(_method_name)
-
- @retry_method
- def _keepget(self, locator, num_retries=None):
- if locator in self.bufferblocks:
- bb = self.bufferblocks[locator]
- return str(bb.buffer_block[0:bb.write_pointer])
- else:
- return self._keep.get(locator, num_retries=num_retries)
-
- def _init_bufferblock(self):
- last = self._data_locators[-1]
- streamoffset = last[OFFSET] + last[BLOCKSIZE]
- if last[BLOCKSIZE] == 0:
- del self._data_locators[-1]
- self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset)
- self.bufferblocks[self.current_bblock.locator] = self.current_bblock
- self._data_locators.append(self.current_bblock.locator_list_entry)
-
- def _repack_writes(self):
- '''Test if the buffer block has more data than is referenced by actual segments
- (this happens when a buffered write over-writes a file range written in
- a previous buffered write). Re-pack the buffer block for efficiency
- and to avoid leaking information.
- '''
- segs = self._files.values()[0].segments
-
- bufferblock_segs = []
- i = 0
- tmp_segs = copy.copy(segs)
- while i < len(tmp_segs):
- # Go through each segment and identify segments that include the buffer block
- s = tmp_segs[i]
- if s[LOCATOR] < self.current_bblock.locator_list_entry[OFFSET] and (s[LOCATOR] + s[BLOCKSIZE]) > self.current_bblock.locator_list_entry[OFFSET]:
- # The segment straddles the previous block and the current buffer block. Split the segment.
- b1 = self.current_bblock.locator_list_entry[OFFSET] - s[LOCATOR]
- b2 = (s[LOCATOR] + s[BLOCKSIZE]) - self.current_bblock.locator_list_entry[OFFSET]
- bb_seg = [self.current_bblock.locator_list_entry[OFFSET], b2, s[OFFSET]+b1]
- tmp_segs[i] = [s[LOCATOR], b1, s[OFFSET]]
- tmp_segs.insert(i+1, bb_seg)
- bufferblock_segs.append(bb_seg)
- i += 1
- elif s[LOCATOR] >= self.current_bblock.locator_list_entry[OFFSET]:
- # The segment's data is in the buffer block.
- bufferblock_segs.append(s)
- i += 1
-
- # Now sum up the segments to get the total bytes
- # of the file referencing into the buffer block.
- write_total = sum([s[BLOCKSIZE] for s in bufferblock_segs])
-
- if write_total < self.current_bblock.locator_list_entry[BLOCKSIZE]:
- # There is more data in the buffer block than is actually accounted for by segments, so
- # re-pack into a new buffer by copying over to a new buffer block.
- new_bb = BufferBlock(self.current_bblock.locator,
- self.current_bblock.locator_list_entry[OFFSET],
- starting_size=write_total)
- for t in bufferblock_segs:
- t_start = t[LOCATOR] - self.current_bblock.locator_list_entry[OFFSET]
- t_end = t_start + t[BLOCKSIZE]
- t[0] = self.current_bblock.locator_list_entry[OFFSET] + new_bb.write_pointer
- new_bb.append(self.current_bblock.buffer_block[t_start:t_end])
-
- self.current_bblock = new_bb
- self.bufferblocks[self.current_bblock.locator] = self.current_bblock
- self._data_locators[-1] = self.current_bblock.locator_list_entry
- self._files.values()[0].segments = tmp_segs
-
- def _commit(self):
- # commit buffer block
-
- # TODO: do 'put' in the background?
- pdh = self._keep.put(self.current_bblock.buffer_block[0:self.current_bblock.write_pointer])
- self._data_locators[-1][0] = pdh
- self.current_bblock = None
-
- def commit(self):
- with self.mutex:
- self._repack_writes()
- self._commit()
-
- def _append(self, data):
- if len(data) > config.KEEP_BLOCK_SIZE:
- raise ArgumentError("Please append data chunks smaller than config.KEEP_BLOCK_SIZE")
-
- if self.current_bblock is None:
- self._init_bufferblock()
-
- if (self.current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
- self._repack_writes()
- if (self.current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
- self._commit()
- self._init_bufferblock()
-
- self.current_bblock.append(data)
-
- def append(self, data):
- with self.mutex:
- self._append(data)