21910: Merge branch 'main' into 21910-remove-api_client_id
[arvados.git] / sdk / python / tests / test_stream.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import bz2
6 import gzip
7 import io
8 import os
9 import unittest
10 import hashlib
11
12 from unittest import mock
13
14 import arvados
15
16 from . import arvados_testutil as tutil
17 from . import run_test_server
18
19 class StreamFileReaderTestMixin(object):
20     def test_read_block_crossing_behavior(self):
21         # read() calls will be aligned on block boundaries - see #3663.
22         sfile = self.make_count_reader()
23         self.assertEqual(b'123', sfile.read(10))
24
25     def test_small_read(self):
26         sfile = self.make_count_reader()
27         self.assertEqual(b'12', sfile.read(2))
28
29     def test_successive_reads(self):
30         sfile = self.make_count_reader()
31         for expect in [b'1234', b'5678', b'9', b'']:
32             self.assertEqual(expect, sfile.read(4))
33
34     def test_readfrom_spans_blocks(self):
35         sfile = self.make_count_reader()
36         self.assertEqual(b'6789', sfile.readfrom(5, 12))
37
38     def test_small_readfrom_spanning_blocks(self):
39         sfile = self.make_count_reader()
40         self.assertEqual(b'2345', sfile.readfrom(1, 4))
41
42     def test_readall(self):
43         sfile = self.make_count_reader()
44         self.assertEqual(b'123456789', b''.join(sfile.readall()))
45
46     def test_one_arg_seek(self):
47         self.test_absolute_seek([])
48
49     def test_absolute_seek(self, args=[os.SEEK_SET]):
50         sfile = self.make_count_reader()
51         sfile.seek(6, *args)
52         self.assertEqual(b'78', sfile.read(2))
53         sfile.seek(4, *args)
54         self.assertEqual(b'56', sfile.read(2))
55
56     def test_relative_seek(self, args=[os.SEEK_CUR]):
57         sfile = self.make_count_reader()
58         self.assertEqual(b'12', sfile.read(2))
59         sfile.seek(2, *args)
60         self.assertEqual(b'56', sfile.read(2))
61
62     def test_end_seek(self):
63         sfile = self.make_count_reader()
64         sfile.seek(-6, os.SEEK_END)
65         self.assertEqual(b'45', sfile.read(2))
66
67     def test_seek_min_zero(self):
68         sfile = self.make_count_reader()
69         self.assertEqual(0, sfile.tell())
70         with self.assertRaises(IOError):
71             sfile.seek(-2, os.SEEK_SET)
72         self.assertEqual(0, sfile.tell())
73
74     def test_seek_max_size(self):
75         sfile = self.make_count_reader()
76         sfile.seek(2, os.SEEK_END)
77         # POSIX permits seeking past end of file.
78         self.assertEqual(11, sfile.tell())
79
80     def test_size(self):
81         self.assertEqual(9, self.make_count_reader().size())
82
83     def test_tell_after_small_read(self):
84         sfile = self.make_count_reader()
85         sfile.read(1)
86         self.assertEqual(1, sfile.tell())
87
88     def test_no_read_after_close(self):
89         sfile = self.make_count_reader()
90         sfile.close()
91         self.assertRaises(ValueError, sfile.read, 2)
92
93     def test_context(self):
94         with self.make_count_reader() as sfile:
95             self.assertFalse(sfile.closed, "reader is closed inside context")
96             self.assertEqual(b'12', sfile.read(2))
97         self.assertTrue(sfile.closed, "reader is open after context")
98
99     def check_lines(self, actual):
100         self.assertEqual(['one\n', 'two\n', '\n', 'three\n', 'four\n', '\n'],
101                          actual)
102
103     def test_readline(self):
104         reader = self.make_newlines_reader()
105         actual = []
106         while True:
107             data = reader.readline()
108             if not data:
109                 break
110             actual.append(data)
111         self.check_lines(actual)
112
113     def test_readlines(self):
114         self.check_lines(self.make_newlines_reader().readlines())
115
116     def test_iteration(self):
117         self.check_lines(list(iter(self.make_newlines_reader())))
118
119     def test_readline_size(self):
120         reader = self.make_newlines_reader()
121         self.assertEqual('on', reader.readline(2))
122         self.assertEqual('e\n', reader.readline(4))
123         self.assertEqual('two\n', reader.readline(6))
124         self.assertEqual('\n', reader.readline(8))
125         self.assertEqual('thre', reader.readline(4))
126
127     def test_readlines_sizehint(self):
128         result = self.make_newlines_reader().readlines(8)
129         self.assertEqual(['one\n', 'two\n', '\n', 'three\n', 'four\n', '\n'], result)
130
131     def test_name_attribute(self):
132         sfile = self.make_file_reader(name='nametest')
133         self.assertEqual('nametest', sfile.name)
134
135     def check_decompressed_name(self, filename, expect):
136         reader = self.make_file_reader(name=filename)
137         self.assertEqual(expect, reader.decompressed_name())
138
139     def test_decompressed_name_uncompressed_file(self):
140         self.check_decompressed_name('test.log', 'test.log')
141
142     def test_decompressed_name_gzip_file(self):
143         self.check_decompressed_name('test.log.gz', 'test.log')
144
145     def test_decompressed_name_bz2_file(self):
146         self.check_decompressed_name('test.log.bz2', 'test.log')
147
148     def check_decompression(self, compress_ext, compress_func):
149         test_text = b'decompression\ntest\n'
150         test_data = compress_func(test_text)
151         reader = self.make_file_reader(name='test.'+compress_ext, data=test_data)
152         self.assertEqual(test_text, b''.join(reader.readall_decompressed()))
153
154     @staticmethod
155     def gzip_compress(data):
156         compressed_data = io.BytesIO()
157         with gzip.GzipFile(fileobj=compressed_data, mode='wb') as gzip_file:
158             gzip_file.write(data)
159         return compressed_data.getvalue()
160
161     def test_no_decompression(self):
162         self.check_decompression('log', lambda s: s)
163
164     def test_gzip_decompression(self):
165         self.check_decompression('gz', self.gzip_compress)
166
167     def test_bz2_decompression(self):
168         self.check_decompression('bz2', bz2.compress)
169
170     def test_readline_then_readlines(self):
171         reader = self.make_newlines_reader()
172         data = reader.readline()
173         self.assertEqual('one\n', data)
174         data = reader.readlines()
175         self.assertEqual(['two\n', '\n', 'three\n', 'four\n', '\n'], data)
176
177     def test_readline_then_readall(self):
178         reader = self.make_newlines_reader()
179         data = reader.readline()
180         self.assertEqual('one\n', data)
181         self.assertEqual(b''.join([b'two\n', b'\n', b'three\n', b'four\n', b'\n']), b''.join(reader.readall()))
182
183
184 class StreamRetryTestMixin(object):
185     # Define reader_for(coll_name, **kwargs)
186     # and read_for_test(reader, size, **kwargs).
187     API_COLLECTIONS = run_test_server.fixture('collections')
188
189     def keep_client(self):
190         return arvados.KeepClient(proxy='http://[%s]:1' % (tutil.TEST_HOST,),
191                                   local_store='')
192
193     def manifest_for(self, coll_name):
194         return self.API_COLLECTIONS[coll_name]['manifest_text']
195
196     @tutil.skip_sleep
197     def test_success_without_retries(self):
198         with tutil.mock_keep_responses('bar', 200):
199             reader = self.reader_for('bar_file')
200             self.assertEqual(b'bar', self.read_for_test(reader, 3))
201
202     @tutil.skip_sleep
203     def test_read_with_instance_retries(self):
204         with tutil.mock_keep_responses('foo', 500, 200):
205             reader = self.reader_for('foo_file', num_retries=3)
206             self.assertEqual(b'foo', self.read_for_test(reader, 3))
207
208     @tutil.skip_sleep
209     def test_read_with_method_retries(self):
210         with tutil.mock_keep_responses('foo', 500, 200):
211             reader = self.reader_for('foo_file')
212             self.assertEqual(b'foo',
213                              self.read_for_test(reader, 3, num_retries=3))
214
215     @tutil.skip_sleep
216     def test_read_instance_retries_exhausted(self):
217         with tutil.mock_keep_responses('bar', 500, 500, 500, 500, 200):
218             reader = self.reader_for('bar_file', num_retries=3)
219             with self.assertRaises(arvados.errors.KeepReadError):
220                 self.read_for_test(reader, 3)
221
222     @tutil.skip_sleep
223     def test_read_method_retries_exhausted(self):
224         with tutil.mock_keep_responses('bar', 500, 500, 500, 500, 200):
225             reader = self.reader_for('bar_file')
226             with self.assertRaises(arvados.errors.KeepReadError):
227                 self.read_for_test(reader, 3, num_retries=3)
228
229     @tutil.skip_sleep
230     def test_method_retries_take_precedence(self):
231         with tutil.mock_keep_responses('', 500, 500, 500, 200):
232             reader = self.reader_for('user_agreement', num_retries=10)
233             with self.assertRaises(arvados.errors.KeepReadError):
234                 self.read_for_test(reader, 10, num_retries=1)
235
236
237 if __name__ == '__main__':
238     unittest.main()