def __init__(self, errors):
self.errors = errors
+ def __repr__(self):
+ return "\n".join(self.errors)
+
class BlockManager(object):
def __init__(self, keep):
self._keep = keep
self._put_queue.task_done()
if self._put_threads is None:
- self._put_queue = Queue.Queue()
+ self._put_queue = Queue.Queue(maxsize=2)
self._put_errors = Queue.Queue()
- self._put_threads = [threading.Thread(target=worker, args=(self,)),
- threading.Thread(target=worker, args=(self,))]
- self._put_threads[0].start()
- self._put_threads[1].start()
+ self._put_threads = [threading.Thread(target=worker, args=(self,)), threading.Thread(target=worker, args=(self,))]
+ for t in self._put_threads:
+ t.start()
block.state = BufferBlock.PENDING
self._put_queue.put(block)
return
if self._prefetch_threads is None:
self._prefetch_queue = Queue.Queue()
- self._prefetch_threads = [threading.Thread(target=worker, args=(self,))]
+ self._prefetch_threads = [threading.Thread(target=worker, args=(self,)),
+ threading.Thread(target=worker, args=(self,))]
self._prefetch_threads[0].start()
+ self._prefetch_threads[1].start()
self._prefetch_queue.put(locator)
class ArvadosFile(object):
- def __init__(self, parent, stream=[], segments=[], keep=None):
+ def __init__(self, parent, stream=[], segments=[]):
'''
stream: a list of Range objects representing a block stream
segments: a list of Range objects representing segments
'''
self.parent = parent
self._modified = True
- self._segments = []
+ self.segments = []
for s in segments:
self.add_segment(stream, s.range_start, s.range_size)
self._current_bblock = None
- self._keep = keep
def set_unmodified(self):
self._modified = False
def truncate(self, size):
new_segs = []
- for r in self._segments:
+ for r in self.segments:
range_end = r.range_start+r.range_size
if r.range_start >= size:
# segment is past the trucate size, all done
else:
new_segs.append(r)
- self._segments = new_segs
+ self.segments = new_segs
self._modified = True
def readfrom(self, offset, size, num_retries):
if size == 0 or offset >= self.size():
return ''
- if self._keep is None:
- self._keep = KeepClient(num_retries=num_retries)
data = []
- for lr in locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE):
+ for lr in locators_and_ranges(self.segments, offset, size + config.KEEP_BLOCK_SIZE):
self.parent._my_block_manager().block_prefetch(lr.locator)
- for lr in locators_and_ranges(self._segments, offset, size):
+ for lr in locators_and_ranges(self.segments, offset, size):
# TODO: if data is empty, wait on block get, otherwise only
# get more data if the block is already in the cache.
data.append(self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
a previous buffered write). Re-pack the buffer block for efficiency
and to avoid leaking information.
'''
- segs = self._segments
+ segs = self.segments
# Sum up the segments to get the total bytes of the file referencing
# into the buffer block.
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock()
self._current_bblock.append(data)
- replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
+ replace_range(self.segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
def add_segment(self, blocks, pos, size):
self._modified = True
for lr in locators_and_ranges(blocks, pos, size):
- last = self._segments[-1] if self._segments else Range(0, 0, 0)
+ last = self.segments[-1] if self.segments else Range(0, 0, 0)
r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
- self._segments.append(r)
+ self.segments.append(r)
def size(self):
- if self._segments:
- n = self._segments[-1]
+ if self.segments:
+ n = self.segments[-1]
return n.range_start + n.range_size
else:
return 0