self._pending_write_size = 0
self.threads_lock = threading.Lock()
self.padding_block = None
+ self._repacked_bb = {}
+ self._repacked_bb_lock = threading.Lock()
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
+ with self._repacked_bb_lock:
+ # Check if this block was created by repacking smaller blocks
+ if bufferblock.blockid in self._repacked_bb:
+ # Update segment locators (with its tokens) of files within
+ # this block
+ old_loc = self._repacked_bb[bufferblock.blockid]['unsigned_loc']
+ for f in self._repacked_bb[bufferblock.blockid]['files']:
+ for s in [x for x in f._segments if x.locator == old_loc]:
+ s.locator = loc
+ del(self._repacked_bb[bufferblock.blockid])
except Exception as e:
bufferblock.set_state(_BufferBlock.ERROR, e)
finally:
new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
files.append((bb, new_bb.write_pointer - bb.size()))
+ # If this repacked block will be committed asynchronously, take note
+ # of its files so their segments' locators will be updated with
+ # the correct permission token returned by the API server.
+ if not sync:
+ with self._repacked_bb_lock:
+ self._repacked_bb[new_bb.blockid] = {
+ 'unsigned_loc': new_bb.locator(),
+ 'files': [bb.owner for bb, _ in files],
+ }
+
self.commit_bufferblock(new_bb, sync=sync)
- for bb, new_bb_segment_offset in files:
- newsegs = bb.owner.segments()
- for s in newsegs:
- if s.locator == bb.blockid:
- s.locator = new_bb.locator()
- s.segment_offset = new_bb_segment_offset+s.segment_offset
- bb.owner.set_segments(newsegs)
- self._delete_bufferblock(bb.blockid)
+ with self._repacked_bb_lock:
+ for bb, new_bb_segment_offset in files:
+ newsegs = bb.owner.segments()
+ for s in newsegs:
+ if s.locator == bb.blockid:
+ s.locator = new_bb.locator()
+ s.segment_offset = new_bb_segment_offset+s.segment_offset
+ bb.owner.set_segments(newsegs)
+ self._delete_bufferblock(bb.blockid)
def commit_bufferblock(self, block, sync):
"""Initiate a background upload of a bufferblock.