Merge branch '3187-pipeline-instance-page' into 3605-improved-dashboard
[arvados.git] / sdk / python / tests / test_stream.py
1 #!/usr/bin/env python
2
3 import mock
4 import unittest
5
6 import arvados
7 from arvados import StreamReader, StreamFileReader
8
9 import arvados_testutil as tutil
10 import run_test_server
11
12 class StreamRetryTestMixin(object):
13     # Define reader_for(coll_name, **kwargs)
14     # and read_for_test(reader, size, **kwargs).
15     API_COLLECTIONS = run_test_server.fixture('collections')
16
17     def keep_client(self):
18         return arvados.KeepClient(proxy='http://[%s]:1' % (tutil.TEST_HOST,),
19                                   local_store='')
20
21     def manifest_for(self, coll_name):
22         return self.API_COLLECTIONS[coll_name]['manifest_text']
23
24     @tutil.skip_sleep
25     def test_success_without_retries(self):
26         reader = self.reader_for('bar_file')
27         with tutil.mock_responses('bar', 200):
28             self.assertEqual('bar', self.read_for_test(reader, 3))
29
30     @tutil.skip_sleep
31     def test_read_no_default_retry(self):
32         reader = self.reader_for('user_agreement')
33         with tutil.mock_responses('', 500):
34             with self.assertRaises(arvados.errors.KeepReadError):
35                 self.read_for_test(reader, 10)
36
37     @tutil.skip_sleep
38     def test_read_with_instance_retries(self):
39         reader = self.reader_for('foo_file', num_retries=3)
40         with tutil.mock_responses('foo', 500, 200):
41             self.assertEqual('foo', self.read_for_test(reader, 3))
42
43     @tutil.skip_sleep
44     def test_read_with_method_retries(self):
45         reader = self.reader_for('foo_file')
46         with tutil.mock_responses('foo', 500, 200):
47             self.assertEqual('foo',
48                              self.read_for_test(reader, 3, num_retries=3))
49
50     @tutil.skip_sleep
51     def test_read_instance_retries_exhausted(self):
52         reader = self.reader_for('bar_file', num_retries=3)
53         with tutil.mock_responses('bar', 500, 500, 500, 500, 200):
54             with self.assertRaises(arvados.errors.KeepReadError):
55                 self.read_for_test(reader, 3)
56
57     @tutil.skip_sleep
58     def test_read_method_retries_exhausted(self):
59         reader = self.reader_for('bar_file')
60         with tutil.mock_responses('bar', 500, 500, 500, 500, 200):
61             with self.assertRaises(arvados.errors.KeepReadError):
62                 self.read_for_test(reader, 3, num_retries=3)
63
64     @tutil.skip_sleep
65     def test_method_retries_take_precedence(self):
66         reader = self.reader_for('user_agreement', num_retries=10)
67         with tutil.mock_responses('', 500, 500, 500, 200):
68             with self.assertRaises(arvados.errors.KeepReadError):
69                 self.read_for_test(reader, 10, num_retries=1)
70
71
72 class StreamReaderTestCase(unittest.TestCase, StreamRetryTestMixin):
73     def reader_for(self, coll_name, **kwargs):
74         return StreamReader(self.manifest_for(coll_name).split(),
75                             self.keep_client(), **kwargs)
76
77     def read_for_test(self, reader, byte_count, **kwargs):
78         return reader.readfrom(0, byte_count, **kwargs)
79
80     def test_manifest_text_without_keep_client(self):
81         mtext = self.manifest_for('multilevel_collection_1')
82         for line in mtext.rstrip('\n').split('\n'):
83             reader = StreamReader(line.split())
84             self.assertEqual(line + '\n', reader.manifest_text())
85
86
87 class StreamFileReadTestCase(unittest.TestCase, StreamRetryTestMixin):
88     def reader_for(self, coll_name, **kwargs):
89         return StreamReader(self.manifest_for(coll_name).split(),
90                             self.keep_client(), **kwargs).all_files()[0]
91
92     def read_for_test(self, reader, byte_count, **kwargs):
93         return reader.read(byte_count, **kwargs)
94
95
96 class StreamFileReadFromTestCase(StreamFileReadTestCase):
97     def read_for_test(self, reader, byte_count, **kwargs):
98         return reader.readfrom(0, byte_count, **kwargs)
99
100
101 class StreamFileReadAllTestCase(StreamFileReadTestCase):
102     def read_for_test(self, reader, byte_count, **kwargs):
103         return ''.join(reader.readall(**kwargs))
104
105
106 class StreamFileReadAllDecompressedTestCase(StreamFileReadTestCase):
107     def read_for_test(self, reader, byte_count, **kwargs):
108         return ''.join(reader.readall_decompressed(**kwargs))
109
110
111 class StreamFileReadlinesTestCase(StreamFileReadTestCase):
112     def read_for_test(self, reader, byte_count, **kwargs):
113         return ''.join(reader.readlines(**kwargs))
114
115
116 if __name__ == '__main__':
117     unittest.main()