X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/68cd78ca1a20d80e0ab90d125df305f30b606f85..22286e8b81fa7644500e197b95e6d6417ed25f7e:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 61ba3e04fd..c46019a0d4 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -245,6 +245,9 @@ class AsyncKeepWriteErrors(Exception): def __init__(self, errors): self.errors = errors + def __repr__(self): + return "\n".join(self.errors) + class BlockManager(object): def __init__(self, keep): self._keep = keep @@ -299,12 +302,11 @@ class BlockManager(object): self._put_queue.task_done() if self._put_threads is None: - self._put_queue = Queue.Queue() + self._put_queue = Queue.Queue(maxsize=2) self._put_errors = Queue.Queue() - self._put_threads = [threading.Thread(target=worker, args=(self,)), - threading.Thread(target=worker, args=(self,))] - self._put_threads[0].start() - self._put_threads[1].start() + self._put_threads = [threading.Thread(target=worker, args=(self,)), threading.Thread(target=worker, args=(self,))] + for t in self._put_threads: + t.start() block.state = BufferBlock.PENDING self._put_queue.put(block) @@ -355,18 +357,17 @@ class BlockManager(object): self._prefetch_queue.put(locator) class ArvadosFile(object): - def __init__(self, parent, stream=[], segments=[], keep=None): + def __init__(self, parent, stream=[], segments=[]): ''' stream: a list of Range objects representing a block stream segments: a list of Range objects representing segments ''' self.parent = parent self._modified = True - self._segments = [] + self.segments = [] for s in segments: self.add_segment(stream, s.range_start, s.range_size) self._current_bblock = None - self._keep = keep def set_unmodified(self): self._modified = False @@ -376,7 +377,7 @@ class ArvadosFile(object): def truncate(self, size): new_segs = [] - for r in self._segments: + for r in self.segments: range_end = r.range_start+r.range_size if r.range_start >= size: # segment is past the trucate size, all done @@ -389,7 +390,7 @@ class ArvadosFile(object): else: new_segs.append(r) - self._segments = new_segs + self.segments = new_segs self._modified = True def readfrom(self, offset, size, num_retries): @@ -397,10 +398,10 @@ class ArvadosFile(object): return '' data = [] - for lr in locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE): + for lr in locators_and_ranges(self.segments, offset, size + config.KEEP_BLOCK_SIZE): self.parent._my_block_manager().block_prefetch(lr.locator) - for lr in locators_and_ranges(self._segments, offset, size): + for lr in locators_and_ranges(self.segments, offset, size): # TODO: if data is empty, wait on block get, otherwise only # get more data if the block is already in the cache. data.append(self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size]) @@ -412,7 +413,7 @@ class ArvadosFile(object): a previous buffered write). Re-pack the buffer block for efficiency and to avoid leaking information. ''' - segs = self._segments + segs = self.segments # Sum up the segments to get the total bytes of the file referencing # into the buffer block. @@ -451,18 +452,18 @@ class ArvadosFile(object): self._current_bblock = self.parent._my_block_manager().alloc_bufferblock() self._current_bblock.append(data) - replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) + replace_range(self.segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) def add_segment(self, blocks, pos, size): self._modified = True for lr in locators_and_ranges(blocks, pos, size): - last = self._segments[-1] if self._segments else Range(0, 0, 0) + last = self.segments[-1] if self.segments else Range(0, 0, 0) r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) - self._segments.append(r) + self.segments.append(r) def size(self): - if self._segments: - n = self._segments[-1] + if self.segments: + n = self.segments[-1] return n.range_start + n.range_size else: return 0