if self._put_threads is None:
self._put_queue = Queue.Queue(maxsize=2)
self._put_errors = Queue.Queue()
- self._put_threads = [threading.Thread(target=worker, args=(self,)), threading.Thread(target=worker, args=(self,))]
+ self._put_threads = [threading.Thread(target=worker, args=(self,)),
+ threading.Thread(target=worker, args=(self,))]
for t in self._put_threads:
+ t.daemon = True
t.start()
block.state = BufferBlock.PENDING
self._put_queue.put(block)
- def get_block(self, locator, num_retries):
+ def get_block(self, locator, num_retries, cache_only=False):
if locator in self._bufferblocks:
bb = self._bufferblocks[locator]
if bb.state != BufferBlock.COMMITTED:
return bb.buffer_view[0:bb.write_pointer].tobytes()
else:
locator = bb._locator
- return self._keep.get(locator, num_retries=num_retries)
+ return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only)
def commit_all(self):
for k,v in self._bufferblocks.items():
self._prefetch_queue = Queue.Queue()
self._prefetch_threads = [threading.Thread(target=worker, args=(self,)),
threading.Thread(target=worker, args=(self,))]
- self._prefetch_threads[0].start()
- self._prefetch_threads[1].start()
+ for t in self._prefetch_threads:
+ t.daemon = True
+ t.start()
self._prefetch_queue.put(locator)
class ArvadosFile(object):
self._modified = True
self.segments = []
for s in segments:
- self.add_segment(stream, s.range_start, s.range_size)
+ self.add_segment(stream, s.locator, s.range_size)
self._current_bblock = None
def set_unmodified(self):
self.parent._my_block_manager().block_prefetch(lr.locator)
for lr in locators_and_ranges(self.segments, offset, size):
- # TODO: if data is empty, wait on block get, otherwise only
- # get more data if the block is already in the cache.
- data.append(self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
+ d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
+ if d:
+ data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size])
+ else:
+ break
return ''.join(data)
def _repack_writes(self):