3147: Add retry support to PySDK StreamReader classes.
[arvados.git] / sdk / python / tests / test_stream.py
index 060a26a8ace370cd259dcae701c4fe649f3c9d1b..3970d672a6a46ac1ada71ca553bdbdd94834cd5e 100644 (file)
@@ -1,18 +1,82 @@
 #!/usr/bin/env python
 
+import mock
 import unittest
 
 import arvados
 from arvados import StreamReader, StreamFileReader
 
+import arvados_testutil as tutil
 import run_test_server
 
-class StreamReaderTestCase(unittest.TestCase):
+class StreamRetryTestMixin(object):
+    # Define reader_for(coll_name, **kwargs)
+    # and read_for_test(reader, size, **kwargs).
     API_COLLECTIONS = run_test_server.fixture('collections')
 
+    def keep_client(self):
+        return arvados.KeepClient(proxy='http://[%s]:1' % (tutil.TEST_HOST,),
+                                  local_store='')
+
     def manifest_for(self, coll_name):
         return self.API_COLLECTIONS[coll_name]['manifest_text']
 
+    @tutil.skip_sleep
+    def test_success_without_retries(self):
+        reader = self.reader_for('bar_file')
+        with tutil.mock_responses('bar', 200):
+            self.assertEqual('bar', self.read_for_test(reader, 3))
+
+    @tutil.skip_sleep
+    def test_read_no_default_retry(self):
+        reader = self.reader_for('user_agreement')
+        with tutil.mock_responses('', 500):
+            with self.assertRaises(arvados.errors.KeepReadError):
+                self.read_for_test(reader, 10)
+
+    @tutil.skip_sleep
+    def test_read_with_instance_retries(self):
+        reader = self.reader_for('foo_file', num_retries=3)
+        with tutil.mock_responses('foo', 500, 200):
+            self.assertEqual('foo', self.read_for_test(reader, 3))
+
+    @tutil.skip_sleep
+    def test_read_with_method_retries(self):
+        reader = self.reader_for('foo_file')
+        with tutil.mock_responses('foo', 500, 200):
+            self.assertEqual('foo',
+                             self.read_for_test(reader, 3, num_retries=3))
+
+    @tutil.skip_sleep
+    def test_read_instance_retries_exhausted(self):
+        reader = self.reader_for('bar_file', num_retries=3)
+        with tutil.mock_responses('bar', 500, 500, 500, 500, 200):
+            with self.assertRaises(arvados.errors.KeepReadError):
+                self.read_for_test(reader, 3)
+
+    @tutil.skip_sleep
+    def test_read_method_retries_exhausted(self):
+        reader = self.reader_for('bar_file')
+        with tutil.mock_responses('bar', 500, 500, 500, 500, 200):
+            with self.assertRaises(arvados.errors.KeepReadError):
+                self.read_for_test(reader, 3, num_retries=3)
+
+    @tutil.skip_sleep
+    def test_method_retries_take_precedence(self):
+        reader = self.reader_for('user_agreement', num_retries=10)
+        with tutil.mock_responses('', 500, 500, 500, 200):
+            with self.assertRaises(arvados.errors.KeepReadError):
+                self.read_for_test(reader, 10, num_retries=1)
+
+
+class StreamReaderTestCase(unittest.TestCase, StreamRetryTestMixin):
+    def reader_for(self, coll_name, **kwargs):
+        return StreamReader(self.manifest_for(coll_name).split(),
+                            self.keep_client(), **kwargs)
+
+    def read_for_test(self, reader, byte_count, **kwargs):
+        return reader.readfrom(0, byte_count, **kwargs)
+
     def test_manifest_text_without_keep_client(self):
         mtext = self.manifest_for('multilevel_collection_1')
         for line in mtext.rstrip('\n').split('\n'):
@@ -20,5 +84,34 @@ class StreamReaderTestCase(unittest.TestCase):
             self.assertEqual(line + '\n', reader.manifest_text())
 
 
+class StreamFileReadTestCase(unittest.TestCase, StreamRetryTestMixin):
+    def reader_for(self, coll_name, **kwargs):
+        return StreamReader(self.manifest_for(coll_name).split(),
+                            self.keep_client(), **kwargs).all_files()[0]
+
+    def read_for_test(self, reader, byte_count, **kwargs):
+        return reader.read(byte_count, **kwargs)
+
+
+class StreamFileReadFromTestCase(StreamFileReadTestCase):
+    def read_for_test(self, reader, byte_count, **kwargs):
+        return reader.readfrom(0, byte_count, **kwargs)
+
+
+class StreamFileReadAllTestCase(StreamFileReadTestCase):
+    def read_for_test(self, reader, byte_count, **kwargs):
+        return ''.join(reader.readall(**kwargs))
+
+
+class StreamFileReadAllDecompressedTestCase(StreamFileReadTestCase):
+    def read_for_test(self, reader, byte_count, **kwargs):
+        return ''.join(reader.readall_decompressed(**kwargs))
+
+
+class StreamFileReadlinesTestCase(StreamFileReadTestCase):
+    def read_for_test(self, reader, byte_count, **kwargs):
+        return ''.join(reader.readlines(**kwargs))
+
+
 if __name__ == '__main__':
     unittest.main()