From 89796f01a6ea3cb553a61be6ce92883a1decf003 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Sat, 3 Jan 2015 23:25:22 -0500 Subject: [PATCH] 3198: Apply StreamFileReader tests to ArvadosFileReader --- sdk/python/arvados/arvfile.py | 23 +++++--- sdk/python/arvados/keep.py | 52 ++++++++++++----- sdk/python/arvados/ranges.py | 4 +- sdk/python/tests/test_arvfile.py | 99 +++++++++++++++++++++++++++++++- 4 files changed, 149 insertions(+), 29 deletions(-) diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index c46019a0d4..1c21d832c0 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -304,21 +304,23 @@ class BlockManager(object): if self._put_threads is None: self._put_queue = Queue.Queue(maxsize=2) self._put_errors = Queue.Queue() - self._put_threads = [threading.Thread(target=worker, args=(self,)), threading.Thread(target=worker, args=(self,))] + self._put_threads = [threading.Thread(target=worker, args=(self,)), + threading.Thread(target=worker, args=(self,))] for t in self._put_threads: + t.daemon = True t.start() block.state = BufferBlock.PENDING self._put_queue.put(block) - def get_block(self, locator, num_retries): + def get_block(self, locator, num_retries, cache_only=False): if locator in self._bufferblocks: bb = self._bufferblocks[locator] if bb.state != BufferBlock.COMMITTED: return bb.buffer_view[0:bb.write_pointer].tobytes() else: locator = bb._locator - return self._keep.get(locator, num_retries=num_retries) + return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only) def commit_all(self): for k,v in self._bufferblocks.items(): @@ -352,8 +354,9 @@ class BlockManager(object): self._prefetch_queue = Queue.Queue() self._prefetch_threads = [threading.Thread(target=worker, args=(self,)), threading.Thread(target=worker, args=(self,))] - self._prefetch_threads[0].start() - self._prefetch_threads[1].start() + for t in self._prefetch_threads: + t.daemon = True + t.start() self._prefetch_queue.put(locator) class ArvadosFile(object): @@ -366,7 +369,7 @@ class ArvadosFile(object): self._modified = True self.segments = [] for s in segments: - self.add_segment(stream, s.range_start, s.range_size) + self.add_segment(stream, s.locator, s.range_size) self._current_bblock = None def set_unmodified(self): @@ -402,9 +405,11 @@ class ArvadosFile(object): self.parent._my_block_manager().block_prefetch(lr.locator) for lr in locators_and_ranges(self.segments, offset, size): - # TODO: if data is empty, wait on block get, otherwise only - # get more data if the block is already in the cache. - data.append(self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size]) + d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data)) + if d: + data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size]) + else: + break return ''.join(data) def _repack_writes(self): diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 36ec56ce05..a087838d26 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -186,24 +186,34 @@ class KeepBlockCache(object): break sm = sum([slot.size() for slot in self._cache]) + def _get(self, locator): + # Test if the locator is already in the cache + for i in xrange(0, len(self._cache)): + if self._cache[i].locator == locator: + n = self._cache[i] + if i != 0: + # move it to the front + del self._cache[i] + self._cache.insert(0, n) + return n + return None + + def get(self, locator): + with self._cache_lock: + return self._get(locator) + def reserve_cache(self, locator): '''Reserve a cache slot for the specified locator, or return the existing slot.''' with self._cache_lock: - # Test if the locator is already in the cache - for i in xrange(0, len(self._cache)): - if self._cache[i].locator == locator: - n = self._cache[i] - if i != 0: - # move it to the front - del self._cache[i] - self._cache.insert(0, n) - return n, False - - # Add a new cache slot for the locator - n = KeepBlockCache.CacheSlot(locator) - self._cache.insert(0, n) - return n, True + n = self._get(locator) + if n: + return n, False + else: + # Add a new cache slot for the locator + n = KeepBlockCache.CacheSlot(locator) + self._cache.insert(0, n) + return n, True class KeepClient(object): @@ -576,7 +586,7 @@ class KeepClient(object): return None @retry.retry_method - def get(self, loc_s, num_retries=None): + def get(self, loc_s, num_retries=None, cache_only=False): """Get data from Keep. This method fetches one or more blocks of data from Keep. It @@ -595,12 +605,21 @@ class KeepClient(object): to fetch data from every available Keep service, along with any that are named in location hints in the locator. The default value is set when the KeepClient is initialized. + * cache_only: If true, return the block data only if already present in + cache, otherwise return None. """ if ',' in loc_s: return ''.join(self.get(x) for x in loc_s.split(',')) locator = KeepLocator(loc_s) expect_hash = locator.md5sum + if cache_only: + slot = self.block_cache.get(expect_hash) + if slot.ready.is_set(): + return slot.get() + else: + return None + slot, first = self.block_cache.reserve_cache(expect_hash) if not first: v = slot.get() @@ -741,3 +760,6 @@ class KeepClient(object): return '' with open(os.path.join(self.local_store, locator.md5sum), 'r') as f: return f.read() + + def is_cached(self, locator): + return self.block_cache.reserve_cache(expect_hash) diff --git a/sdk/python/arvados/ranges.py b/sdk/python/arvados/ranges.py index 8c377e26e7..eeb37e2978 100644 --- a/sdk/python/arvados/ranges.py +++ b/sdk/python/arvados/ranges.py @@ -6,7 +6,7 @@ class Range(object): self.segment_offset = segment_offset def __repr__(self): - return "[\"%s\", %i, %i, %i]" % (self.locator, self.range_start, self.range_size, self.segment_offset) + return "Range(\"%s\", %i, %i, %i)" % (self.locator, self.range_start, self.range_size, self.segment_offset) def first_block(data_locators, range_start, range_size, debug=False): block_start = 0L @@ -55,7 +55,7 @@ class LocatorAndRange(object): self.segment_size == other.segment_size) def __repr__(self): - return "[\"%s\", %i, %i, %i]" % (self.locator, self.block_size, self.segment_offset, self.segment_size) + return "LocatorAndRange(\"%s\", %i, %i, %i)" % (self.locator, self.block_size, self.segment_offset, self.segment_size) def locators_and_ranges(data_locators, range_start, range_size, debug=False): ''' diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py index 5bc54f5f0c..91cfc79982 100644 --- a/sdk/python/tests/test_arvfile.py +++ b/sdk/python/tests/test_arvfile.py @@ -9,17 +9,17 @@ import unittest import hashlib import arvados -from arvados import StreamReader, StreamFileReader, Range, import_manifest, export_manifest +from arvados import ArvadosFile, ArvadosFileReader, Range, import_manifest, export_manifest import arvados_testutil as tutil - +from test_stream import StreamFileReaderTestCase class ArvadosFileWriterTestCase(unittest.TestCase): class MockKeep(object): def __init__(self, blocks): self.blocks = blocks self.requests = [] - def get(self, locator, num_retries=0): + def get(self, locator, num_retries=0, cache_only=False): self.requests.append(locator) return self.blocks.get(locator) def put(self, data): @@ -337,3 +337,96 @@ class ArvadosFileWriterTestCase(unittest.TestCase): r = c.open("count.txt", "r") self.assertEqual("0123", r.read(4)) self.assertEqual(["2e9ec317e197819358fbc43afca7d837+8", "2e9ec317e197819358fbc43afca7d837+8", "e8dc4081b13434b45189a720b77b6818+8"], keep.requests) + + +class ArvadosFileReaderTestCase(StreamFileReaderTestCase): + class MockParent(object): + class MockBlockMgr(object): + def __init__(self, blocks, nocache): + self.blocks = blocks + self.nocache = nocache + + def block_prefetch(self, loc): + pass + + def get_block(self, loc, num_retries=0, cache_only=False): + if self.nocache and cache_only: + return None + return self.blocks[loc] + + def __init__(self, blocks, nocache): + self.blocks = blocks + self.nocache = nocache + + def _my_block_manager(self): + return ArvadosFileReaderTestCase.MockParent.MockBlockMgr(self.blocks, self.nocache) + + def make_count_reader(self, nocache=False): + stream = [] + n = 0 + blocks = {} + for d in ['01234', '34567', '67890']: + loc = '{}+{}'.format(hashlib.md5(d).hexdigest(), len(d)) + blocks[loc] = d + stream.append(Range(loc, n, len(d))) + n += len(d) + af = ArvadosFile(ArvadosFileReaderTestCase.MockParent(blocks, nocache), stream=stream, segments=[Range(1, 0, 3), Range(6, 3, 3), Range(11, 6, 3)]) + return ArvadosFileReader(af, "count.txt") + + def test_read_returns_first_block(self): + # read() calls will be aligned on block boundaries - see #3663. + sfile = self.make_count_reader(nocache=True) + self.assertEqual('123', sfile.read(10)) + + def test_successive_reads(self): + sfile = self.make_count_reader(nocache=True) + for expect in ['123', '456', '789', '']: + self.assertEqual(expect, sfile.read(10)) + + def test_tell_after_block_read(self): + sfile = self.make_count_reader(nocache=True) + sfile.read(5) + self.assertEqual(3, sfile.tell()) + +# class StreamReaderTestCase(unittest.TestCase, StreamRetryTestMixin): +# def reader_for(self, coll_name, **kwargs): +# return StreamReader(self.manifest_for(coll_name).split(), +# self.keep_client(), **kwargs) + +# def read_for_test(self, reader, byte_count, **kwargs): +# return reader.readfrom(0, byte_count, **kwargs) + +# def test_manifest_text_without_keep_client(self): +# mtext = self.manifest_for('multilevel_collection_1') +# for line in mtext.rstrip('\n').split('\n'): +# reader = StreamReader(line.split()) +# self.assertEqual(line + '\n', reader.manifest_text()) + + +# class StreamFileReadTestCase(unittest.TestCase, StreamRetryTestMixin): +# def reader_for(self, coll_name, **kwargs): +# return StreamReader(self.manifest_for(coll_name).split(), +# self.keep_client(), **kwargs).all_files()[0] + +# def read_for_test(self, reader, byte_count, **kwargs): +# return reader.read(byte_count, **kwargs) + + +# class StreamFileReadFromTestCase(StreamFileReadTestCase): +# def read_for_test(self, reader, byte_count, **kwargs): +# return reader.readfrom(0, byte_count, **kwargs) + + +# class StreamFileReadAllTestCase(StreamFileReadTestCase): +# def read_for_test(self, reader, byte_count, **kwargs): +# return ''.join(reader.readall(**kwargs)) + + +# class StreamFileReadAllDecompressedTestCase(StreamFileReadTestCase): +# def read_for_test(self, reader, byte_count, **kwargs): +# return ''.join(reader.readall_decompressed(**kwargs)) + + +# class StreamFileReadlinesTestCase(StreamFileReadTestCase): +# def read_for_test(self, reader, byte_count, **kwargs): +# return ''.join(reader.readlines(**kwargs)) -- 2.30.2