7 from arvados import StreamReader, StreamFileReader
9 import arvados_testutil as tutil
10 import run_test_server
12 class StreamRetryTestMixin(object):
13 # Define reader_for(coll_name, **kwargs)
14 # and read_for_test(reader, size, **kwargs).
15 API_COLLECTIONS = run_test_server.fixture('collections')
17 def keep_client(self):
18 return arvados.KeepClient(proxy='http://[%s]:1' % (tutil.TEST_HOST,),
21 def manifest_for(self, coll_name):
22 return self.API_COLLECTIONS[coll_name]['manifest_text']
25 def test_success_without_retries(self):
26 reader = self.reader_for('bar_file')
27 with tutil.mock_responses('bar', 200):
28 self.assertEqual('bar', self.read_for_test(reader, 3))
31 def test_read_no_default_retry(self):
32 reader = self.reader_for('user_agreement')
33 with tutil.mock_responses('', 500):
34 with self.assertRaises(arvados.errors.KeepReadError):
35 self.read_for_test(reader, 10)
38 def test_read_with_instance_retries(self):
39 reader = self.reader_for('foo_file', num_retries=3)
40 with tutil.mock_responses('foo', 500, 200):
41 self.assertEqual('foo', self.read_for_test(reader, 3))
44 def test_read_with_method_retries(self):
45 reader = self.reader_for('foo_file')
46 with tutil.mock_responses('foo', 500, 200):
47 self.assertEqual('foo',
48 self.read_for_test(reader, 3, num_retries=3))
51 def test_read_instance_retries_exhausted(self):
52 reader = self.reader_for('bar_file', num_retries=3)
53 with tutil.mock_responses('bar', 500, 500, 500, 500, 200):
54 with self.assertRaises(arvados.errors.KeepReadError):
55 self.read_for_test(reader, 3)
58 def test_read_method_retries_exhausted(self):
59 reader = self.reader_for('bar_file')
60 with tutil.mock_responses('bar', 500, 500, 500, 500, 200):
61 with self.assertRaises(arvados.errors.KeepReadError):
62 self.read_for_test(reader, 3, num_retries=3)
65 def test_method_retries_take_precedence(self):
66 reader = self.reader_for('user_agreement', num_retries=10)
67 with tutil.mock_responses('', 500, 500, 500, 200):
68 with self.assertRaises(arvados.errors.KeepReadError):
69 self.read_for_test(reader, 10, num_retries=1)
72 class StreamReaderTestCase(unittest.TestCase, StreamRetryTestMixin):
73 def reader_for(self, coll_name, **kwargs):
74 return StreamReader(self.manifest_for(coll_name).split(),
75 self.keep_client(), **kwargs)
77 def read_for_test(self, reader, byte_count, **kwargs):
78 return reader.readfrom(0, byte_count, **kwargs)
80 def test_manifest_text_without_keep_client(self):
81 mtext = self.manifest_for('multilevel_collection_1')
82 for line in mtext.rstrip('\n').split('\n'):
83 reader = StreamReader(line.split())
84 self.assertEqual(line + '\n', reader.manifest_text())
87 class StreamFileReadTestCase(unittest.TestCase, StreamRetryTestMixin):
88 def reader_for(self, coll_name, **kwargs):
89 return StreamReader(self.manifest_for(coll_name).split(),
90 self.keep_client(), **kwargs).all_files()[0]
92 def read_for_test(self, reader, byte_count, **kwargs):
93 return reader.read(byte_count, **kwargs)
96 class StreamFileReadFromTestCase(StreamFileReadTestCase):
97 def read_for_test(self, reader, byte_count, **kwargs):
98 return reader.readfrom(0, byte_count, **kwargs)
101 class StreamFileReadAllTestCase(StreamFileReadTestCase):
102 def read_for_test(self, reader, byte_count, **kwargs):
103 return ''.join(reader.readall(**kwargs))
106 class StreamFileReadAllDecompressedTestCase(StreamFileReadTestCase):
107 def read_for_test(self, reader, byte_count, **kwargs):
108 return ''.join(reader.readall_decompressed(**kwargs))
111 class StreamFileReadlinesTestCase(StreamFileReadTestCase):
112 def read_for_test(self, reader, byte_count, **kwargs):
113 return ''.join(reader.readlines(**kwargs))
116 if __name__ == '__main__':