From 1f354e0ba1b5b23c2d36c0cb60451260b29e1d3f Mon Sep 17 00:00:00 2001 From: Brett Smith Date: Fri, 12 Sep 2014 09:28:47 -0400 Subject: [PATCH] 3147: Add retry support to PySDK StreamReader classes. --- sdk/python/arvados/keep.py | 7 +- sdk/python/arvados/stream.py | 65 +++++++++++-------- sdk/python/tests/test_collections.py | 5 +- sdk/python/tests/test_stream.py | 95 +++++++++++++++++++++++++++- 4 files changed, 141 insertions(+), 31 deletions(-) diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 9522976025..323251d3e9 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -707,7 +707,10 @@ class KeepClient(object): "Write fail for %s: wanted %d but wrote %d" % (data_hash, copies, thread_limiter.done())) - def local_store_put(self, data): + # Local storage methods need no-op num_retries arguments to keep + # integration tests happy. With better isolation they could + # probably be removed again. + def local_store_put(self, data, num_retries=0): md5 = hashlib.md5(data).hexdigest() locator = '%s+%d' % (md5, len(data)) with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f: @@ -716,7 +719,7 @@ class KeepClient(object): os.path.join(self.local_store, md5)) return locator - def local_store_get(self, loc_s): + def local_store_get(self, loc_s, num_retries=0): try: locator = KeepLocator(loc_s) except ValueError: diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py index b98937fe64..04b6b81804 100644 --- a/sdk/python/arvados/stream.py +++ b/sdk/python/arvados/stream.py @@ -18,6 +18,7 @@ import time import threading import collections +from arvados.retry import retry_method from keep import * import config import errors @@ -108,6 +109,7 @@ class StreamFileReader(object): self.segments = segments self._name = name self._filepos = 0L + self.num_retries = stream.num_retries def name(self): return self._name @@ -128,7 +130,8 @@ class StreamFileReader(object): n = self.segments[-1] return n[OFFSET] + n[BLOCKSIZE] - def read(self, size): + @retry_method + def read(self, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at the current file position""" if size == 0: return '' @@ -137,52 +140,58 @@ class StreamFileReader(object): available_chunks = locators_and_ranges(self.segments, self._filepos, size) if available_chunks: locator, blocksize, segmentoffset, segmentsize = available_chunks[0] - data = self._stream.readfrom(locator+segmentoffset, segmentsize) + data = self._stream.readfrom(locator+segmentoffset, segmentsize, + num_retries=num_retries) self._filepos += len(data) return data - def readfrom(self, start, size): + @retry_method + def readfrom(self, start, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at 'start'""" if size == 0: return '' - data = '' + data = [] for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size): - data += self._stream.readfrom(locator+segmentoffset, segmentsize) - return data + data.append(self._stream.readfrom(locator+segmentoffset, segmentsize, + num_retries=num_retries)) + return ''.join(data) - def readall(self, size=2**20): + @retry_method + def readall(self, size=2**20, num_retries=None): while True: - data = self.read(size) + data = self.read(size, num_retries=num_retries) if data == '': break yield data - def decompress(self, decompress, size): - for segment in self.readall(size): + @retry_method + def decompress(self, decompress, size, num_retries=None): + for segment in self.readall(size, num_retries): data = decompress(segment) if data and data != '': yield data - def readall_decompressed(self, size=2**20): + @retry_method + def readall_decompressed(self, size=2**20, num_retries=None): self.seek(0) if re.search('\.bz2$', self._name): dc = bz2.BZ2Decompressor() - return self.decompress(lambda segment: dc.decompress(segment), size) + return self.decompress(dc.decompress, size, + num_retries=num_retries) elif re.search('\.gz$', self._name): dc = zlib.decompressobj(16+zlib.MAX_WBITS) - return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size) + return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), + size, num_retries=num_retries) else: - return self.readall(size) + return self.readall(size, num_retries=num_retries) - def readlines(self, decompress=True): - if decompress: - datasource = self.readall_decompressed() - else: - datasource = self.readall() + @retry_method + def readlines(self, decompress=True, num_retries=None): + read_func = self.readall_decompressed if decompress else self.readall data = '' - for newdata in datasource: + for newdata in read_func(num_retries=num_retries): data += newdata sol = 0 while True: @@ -201,12 +210,15 @@ class StreamFileReader(object): manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments]) return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text() + class StreamReader(object): - def __init__(self, tokens, keep=None, debug=False, _empty=False): + def __init__(self, tokens, keep=None, debug=False, _empty=False, + num_retries=0): self._stream_name = None self._data_locators = [] self._files = collections.OrderedDict() self._keep = keep + self.num_retries = num_retries streamoffset = 0L @@ -254,16 +266,17 @@ class StreamReader(object): def locators_and_ranges(self, range_start, range_size): return locators_and_ranges(self._data_locators, range_start, range_size) - def readfrom(self, start, size): + @retry_method + def readfrom(self, start, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at 'start'""" if size == 0: return '' if self._keep is None: - self._keep = KeepClient() - data = '' + self._keep = KeepClient(num_retries=self.num_retries) + data = [] for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size): - data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize] - return data + data.append(self._keep.get(locator, num_retries=num_retries)[segmentoffset:segmentoffset+segmentsize]) + return ''.join(data) def manifest_text(self, strip=False): manifest_text = [self.name().replace(' ', '\\040')] diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py index 284854b31d..f066c5d45e 100644 --- a/sdk/python/tests/test_collections.py +++ b/sdk/python/tests/test_collections.py @@ -349,8 +349,9 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers, class MockStreamReader(object): def __init__(self, content): self.content = content + self.num_retries = 0 - def readfrom(self, start, size): + def readfrom(self, start, size, num_retries=0): return self.content[start:start+size] def test_file_stream(self): @@ -422,7 +423,7 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers, def __init__(self, content, num_retries=0): self.content = content - def get(self, locator): + def get(self, locator, num_retries=0): return self.content[locator] def test_stream_reader(self): diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py index 060a26a8ac..3970d672a6 100644 --- a/sdk/python/tests/test_stream.py +++ b/sdk/python/tests/test_stream.py @@ -1,18 +1,82 @@ #!/usr/bin/env python +import mock import unittest import arvados from arvados import StreamReader, StreamFileReader +import arvados_testutil as tutil import run_test_server -class StreamReaderTestCase(unittest.TestCase): +class StreamRetryTestMixin(object): + # Define reader_for(coll_name, **kwargs) + # and read_for_test(reader, size, **kwargs). API_COLLECTIONS = run_test_server.fixture('collections') + def keep_client(self): + return arvados.KeepClient(proxy='http://[%s]:1' % (tutil.TEST_HOST,), + local_store='') + def manifest_for(self, coll_name): return self.API_COLLECTIONS[coll_name]['manifest_text'] + @tutil.skip_sleep + def test_success_without_retries(self): + reader = self.reader_for('bar_file') + with tutil.mock_responses('bar', 200): + self.assertEqual('bar', self.read_for_test(reader, 3)) + + @tutil.skip_sleep + def test_read_no_default_retry(self): + reader = self.reader_for('user_agreement') + with tutil.mock_responses('', 500): + with self.assertRaises(arvados.errors.KeepReadError): + self.read_for_test(reader, 10) + + @tutil.skip_sleep + def test_read_with_instance_retries(self): + reader = self.reader_for('foo_file', num_retries=3) + with tutil.mock_responses('foo', 500, 200): + self.assertEqual('foo', self.read_for_test(reader, 3)) + + @tutil.skip_sleep + def test_read_with_method_retries(self): + reader = self.reader_for('foo_file') + with tutil.mock_responses('foo', 500, 200): + self.assertEqual('foo', + self.read_for_test(reader, 3, num_retries=3)) + + @tutil.skip_sleep + def test_read_instance_retries_exhausted(self): + reader = self.reader_for('bar_file', num_retries=3) + with tutil.mock_responses('bar', 500, 500, 500, 500, 200): + with self.assertRaises(arvados.errors.KeepReadError): + self.read_for_test(reader, 3) + + @tutil.skip_sleep + def test_read_method_retries_exhausted(self): + reader = self.reader_for('bar_file') + with tutil.mock_responses('bar', 500, 500, 500, 500, 200): + with self.assertRaises(arvados.errors.KeepReadError): + self.read_for_test(reader, 3, num_retries=3) + + @tutil.skip_sleep + def test_method_retries_take_precedence(self): + reader = self.reader_for('user_agreement', num_retries=10) + with tutil.mock_responses('', 500, 500, 500, 200): + with self.assertRaises(arvados.errors.KeepReadError): + self.read_for_test(reader, 10, num_retries=1) + + +class StreamReaderTestCase(unittest.TestCase, StreamRetryTestMixin): + def reader_for(self, coll_name, **kwargs): + return StreamReader(self.manifest_for(coll_name).split(), + self.keep_client(), **kwargs) + + def read_for_test(self, reader, byte_count, **kwargs): + return reader.readfrom(0, byte_count, **kwargs) + def test_manifest_text_without_keep_client(self): mtext = self.manifest_for('multilevel_collection_1') for line in mtext.rstrip('\n').split('\n'): @@ -20,5 +84,34 @@ class StreamReaderTestCase(unittest.TestCase): self.assertEqual(line + '\n', reader.manifest_text()) +class StreamFileReadTestCase(unittest.TestCase, StreamRetryTestMixin): + def reader_for(self, coll_name, **kwargs): + return StreamReader(self.manifest_for(coll_name).split(), + self.keep_client(), **kwargs).all_files()[0] + + def read_for_test(self, reader, byte_count, **kwargs): + return reader.read(byte_count, **kwargs) + + +class StreamFileReadFromTestCase(StreamFileReadTestCase): + def read_for_test(self, reader, byte_count, **kwargs): + return reader.readfrom(0, byte_count, **kwargs) + + +class StreamFileReadAllTestCase(StreamFileReadTestCase): + def read_for_test(self, reader, byte_count, **kwargs): + return ''.join(reader.readall(**kwargs)) + + +class StreamFileReadAllDecompressedTestCase(StreamFileReadTestCase): + def read_for_test(self, reader, byte_count, **kwargs): + return ''.join(reader.readall_decompressed(**kwargs)) + + +class StreamFileReadlinesTestCase(StreamFileReadTestCase): + def read_for_test(self, reader, byte_count, **kwargs): + return ''.join(reader.readlines(**kwargs)) + + if __name__ == '__main__': unittest.main() -- 2.30.2