X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/02e9754a68a5816458d517b8f5012530cf17ebba..89796f01a6ea3cb553a61be6ce92883a1decf003:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 1342601d91..1c21d832c0 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -245,16 +245,18 @@ class AsyncKeepWriteErrors(Exception): def __init__(self, errors): self.errors = errors + def __repr__(self): + return "\n".join(self.errors) + class BlockManager(object): def __init__(self, keep): self._keep = keep self._bufferblocks = {} self._put_queue = None self._put_errors = None - self._threads = None - self._continue_worker = True + self._put_threads = None self._prefetch_queue = None - self._prefetch_thread = None + self._prefetch_threads = None def alloc_bufferblock(self, blockid=None, starting_size=2**14): if blockid is None: @@ -263,81 +265,112 @@ class BlockManager(object): self._bufferblocks[bb.blockid] = bb return bb + def stop_threads(self): + if self._put_threads is not None: + for t in self._put_threads: + self._put_queue.put(None) + for t in self._put_threads: + t.join() + self._put_threads = None + self._put_queue = None + self._put_errors = None + + if self._prefetch_threads is not None: + for t in self._prefetch_threads: + self._prefetch_queue.put(None) + for t in self._prefetch_threads: + t.join() + self._prefetch_threads = None + self._prefetch_queue = None + def commit_bufferblock(self, block): def worker(self): - while self._continue_worker: + while True: try: b = self._put_queue.get() - b._locator = self._keep.put(item) + if b is None: + return + b._locator = self._keep.put(b.buffer_view[0:b.write_pointer].tobytes()) b.state = BufferBlock.COMMITTED b.buffer_view = None b.buffer_block = None except Exception as e: - self._error.put(e) + print e + self._put_errors.put(e) finally: - self._queue.task_done() + if self._put_queue is not None: + self._put_queue.task_done() - if self._threads is None: - self._put_queue = Queue.Queue() + if self._put_threads is None: + self._put_queue = Queue.Queue(maxsize=2) self._put_errors = Queue.Queue() - self._threads = [threading.Thread(target=worker, args=(self,)), - threading.Thread(target=worker, args=(self,))] + self._put_threads = [threading.Thread(target=worker, args=(self,)), + threading.Thread(target=worker, args=(self,))] + for t in self._put_threads: + t.daemon = True + t.start() block.state = BufferBlock.PENDING self._put_queue.put(block) - def get_block(self, locator, num_retries): + def get_block(self, locator, num_retries, cache_only=False): if locator in self._bufferblocks: bb = self._bufferblocks[locator] if bb.state != BufferBlock.COMMITTED: return bb.buffer_view[0:bb.write_pointer].tobytes() else: locator = bb._locator - return self._keep.get(locator, num_retries=num_retries) + return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only) def commit_all(self): - for k,v in self._bufferblocks: + for k,v in self._bufferblocks.items(): if v.state == BufferBlock.WRITABLE: self.commit_bufferblock(v) - self._put_queue.join() - if not self._errors.empty(): - e = [] - try: - while True: - e.append(self._errors.get(False)) - except Queue.Empty: - pass - raise AsyncKeepWriteErrors(e) + if self._put_queue is not None: + self._put_queue.join() + if not self._put_errors.empty(): + e = [] + try: + while True: + e.append(self._put_errors.get(False)) + except Queue.Empty: + pass + raise AsyncKeepWriteErrors(e) def block_prefetch(self, locator): - def worker(keep, loc): - while self._continue_worker: + def worker(self): + while True: try: b = self._prefetch_queue.get() - keep.get(loc) + if b is None: + return + self._keep.get(b) except: pass if locator in self._bufferblocks: return - if self._prefetch_thread is None: + if self._prefetch_threads is None: self._prefetch_queue = Queue.Queue() - self._prefetch_thread = threading.Thread(target=worker, args=(self,)) + self._prefetch_threads = [threading.Thread(target=worker, args=(self,)), + threading.Thread(target=worker, args=(self,))] + for t in self._prefetch_threads: + t.daemon = True + t.start() self._prefetch_queue.put(locator) class ArvadosFile(object): - def __init__(self, block_manager, stream=[], segments=[], keep=None): + def __init__(self, parent, stream=[], segments=[]): ''' stream: a list of Range objects representing a block stream segments: a list of Range objects representing segments ''' - self.bbm = block_manager + self.parent = parent self._modified = True - self._segments = [] + self.segments = [] for s in segments: - self.add_segment(stream, s.range_start, s.range_size) + self.add_segment(stream, s.locator, s.range_size) self._current_bblock = None - self._keep = keep def set_unmodified(self): self._modified = False @@ -347,7 +380,7 @@ class ArvadosFile(object): def truncate(self, size): new_segs = [] - for r in self._segments: + for r in self.segments: range_end = r.range_start+r.range_size if r.range_start >= size: # segment is past the trucate size, all done @@ -360,23 +393,23 @@ class ArvadosFile(object): else: new_segs.append(r) - self._segments = new_segs + self.segments = new_segs self._modified = True def readfrom(self, offset, size, num_retries): if size == 0 or offset >= self.size(): return '' - if self._keep is None: - self._keep = KeepClient(num_retries=num_retries) data = [] - for lr in locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE): - self.bbm.block_prefetch(lr.locator) + for lr in locators_and_ranges(self.segments, offset, size + config.KEEP_BLOCK_SIZE): + self.parent._my_block_manager().block_prefetch(lr.locator) - for lr in locators_and_ranges(self._segments, offset, size): - # TODO: if data is empty, wait on block get, otherwise only - # get more data if the block is already in the cache. - data.append(self.bbm.get_block(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size]) + for lr in locators_and_ranges(self.segments, offset, size): + d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data)) + if d: + data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size]) + else: + break return ''.join(data) def _repack_writes(self): @@ -385,7 +418,7 @@ class ArvadosFile(object): a previous buffered write). Re-pack the buffer block for efficiency and to avoid leaking information. ''' - segs = self._segments + segs = self.segments # Sum up the segments to get the total bytes of the file referencing # into the buffer block. @@ -395,7 +428,7 @@ class ArvadosFile(object): if write_total < self._current_bblock.size(): # There is more data in the buffer block than is actually accounted for by segments, so # re-pack into a new buffer by copying over to a new buffer block. - new_bb = self.bbm.alloc_bufferblock(self._current_bblock.blockid, starting_size=write_total) + new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_size=write_total) for t in bufferblock_segs: new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes()) t.segment_offset = new_bb.size() - t.range_size @@ -415,27 +448,27 @@ class ArvadosFile(object): self._modified = True if self._current_bblock is None or self._current_bblock.state != BufferBlock.WRITABLE: - self._current_bblock = self.bbm.alloc_bufferblock() + self._current_bblock = self.parent._my_block_manager().alloc_bufferblock() if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: self._repack_writes() if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: - self.bbm.commit_bufferblock(self._current_bblock) - self._current_bblock = self.bbm.alloc_bufferblock() + self.parent._my_block_manager().commit_bufferblock(self._current_bblock) + self._current_bblock = self.parent._my_block_manager().alloc_bufferblock() self._current_bblock.append(data) - replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) + replace_range(self.segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) def add_segment(self, blocks, pos, size): self._modified = True for lr in locators_and_ranges(blocks, pos, size): - last = self._segments[-1] if self._segments else Range(0, 0, 0) + last = self.segments[-1] if self.segments else Range(0, 0, 0) r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) - self._segments.append(r) + self.segments.append(r) def size(self): - if self._segments: - n = self._segments[-1] + if self.segments: + n = self.segments[-1] return n.range_start + n.range_size else: return 0