From 02e9754a68a5816458d517b8f5012530cf17ebba Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Mon, 29 Dec 2014 16:43:04 -0500 Subject: [PATCH] 3198: Start using BlockManager. Needs tests. --- sdk/python/arvados/arvfile.py | 18 ++++++++++-------- sdk/python/arvados/collection.py | 13 ++++++++----- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index b5d8189039..1342601d91 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -212,7 +212,7 @@ class BufferBlock(object): PENDING = 1 COMMITTED = 2 - def __init__(self, blockid, starting_size=2**14): + def __init__(self, blockid, starting_size): self.blockid = blockid self.buffer_block = bytearray(starting_size) self.buffer_view = memoryview(self.buffer_block) @@ -256,8 +256,10 @@ class BlockManager(object): self._prefetch_queue = None self._prefetch_thread = None - def alloc_bufferblock(self): - bb = BufferBlock("bufferblock%i" % len(self._bufferblocks)) + def alloc_bufferblock(self, blockid=None, starting_size=2**14): + if blockid is None: + blockid = "bufferblock%i" % len(self._bufferblocks) + bb = BufferBlock(blockid, starting_size=starting_size) self._bufferblocks[bb.blockid] = bb return bb @@ -282,7 +284,7 @@ class BlockManager(object): threading.Thread(target=worker, args=(self,))] block.state = BufferBlock.PENDING - self._queue.put(block) + self._put_queue.put(block) def get_block(self, locator, num_retries): if locator in self._bufferblocks: @@ -297,7 +299,7 @@ class BlockManager(object): for k,v in self._bufferblocks: if v.state == BufferBlock.WRITABLE: self.commit_bufferblock(v) - self._queue.join() + self._put_queue.join() if not self._errors.empty(): e = [] try: @@ -324,11 +326,12 @@ class BlockManager(object): self._prefetch_queue.put(locator) class ArvadosFile(object): - def __init__(self, stream=[], segments=[], keep=None): + def __init__(self, block_manager, stream=[], segments=[], keep=None): ''' stream: a list of Range objects representing a block stream segments: a list of Range objects representing segments ''' + self.bbm = block_manager self._modified = True self._segments = [] for s in segments: @@ -392,13 +395,12 @@ class ArvadosFile(object): if write_total < self._current_bblock.size(): # There is more data in the buffer block than is actually accounted for by segments, so # re-pack into a new buffer by copying over to a new buffer block. - new_bb = BufferBlock(self._current_bblock.blockid, starting_size=write_total) + new_bb = self.bbm.alloc_bufferblock(self._current_bblock.blockid, starting_size=write_total) for t in bufferblock_segs: new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes()) t.segment_offset = new_bb.size() - t.range_size self._current_bblock = new_bb - self.bbm[self._current_bblock.blockid] = self._current_bblock def writeto(self, offset, data, num_retries): if len(data) == 0: diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 3cbbca2833..85c7ad7f4f 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -6,7 +6,7 @@ import re from collections import deque from stat import * -from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader +from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager from keep import * from .stream import StreamReader, normalize_stream, locator_block_size from .ranges import Range, LocatorAndRange @@ -641,7 +641,7 @@ class ResumableCollectionWriter(CollectionWriter): class Collection(CollectionBase): def __init__(self, manifest_locator_or_text=None, api_client=None, - keep_client=None, num_retries=0): + keep_client=None, num_retries=0, block_manager=None): self._items = None self._api_client = api_client @@ -651,6 +651,9 @@ class Collection(CollectionBase): self._manifest_text = None self._api_response = None + if block_manager is None: + self.block_manager = BlockManager(keep_client) + if manifest_locator_or_text: if re.match(util.keep_locator_pattern, manifest_locator_or_text): self._manifest_locator = manifest_locator_or_text @@ -753,13 +756,13 @@ class Collection(CollectionBase): # item must be a file if item is None and create: # create new file - item = ArvadosFile(keep=self._keep_client) + item = ArvadosFile(self.block_manager, keep=self._keep_client) self._items[p[0]] = item return item else: if item is None and create: # create new collection - item = Collection(api_client=self._api_client, keep=self._keep_client, num_retries=self.num_retries) + item = Collection(api_client=self._api_client, keep=self._keep_client, num_retries=self.num_retries, block_manager=self.block_manager) self._items[p[0]] = item del p[0] return item.find("/".join(p), create=create) @@ -973,7 +976,7 @@ def export_manifest(item, stream_name=".", portable_locators=False): for s in v._segments: loc = s.locator if loc.startswith("bufferblock"): - loc = v._bufferblocks[loc].calculate_locator() + loc = v.bbm._bufferblocks[loc].locator() st.append(LocatorAndRange(loc, locator_block_size(loc), s.segment_offset, s.range_size)) stream[k] = st -- 2.30.2