Merge branch '3616-live-stream' closes #3616
[arvados.git] / sdk / python / tests / test_collections.py
index 284854b31d6849d98605251d14a7a55128f14f7b..98a72f62f824701421fcfb954dc04f0bce4af11e 100644 (file)
@@ -5,6 +5,7 @@
 import arvados
 import bz2
 import copy
+import mock
 import os
 import pprint
 import subprocess
@@ -12,7 +13,7 @@ import tempfile
 import unittest
 
 import run_test_server
-from arvados_testutil import ArvadosBaseTestCase
+import arvados_testutil as tutil
 
 class TestResumableWriter(arvados.ResumableCollectionWriter):
     KEEP_BLOCK_SIZE = 1024  # PUT to Keep every 1K.
@@ -22,7 +23,7 @@ class TestResumableWriter(arvados.ResumableCollectionWriter):
 
 
 class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
-                             ArvadosBaseTestCase):
+                             tutil.ArvadosBaseTestCase):
     MAIN_SERVER = {}
 
     @classmethod
@@ -349,8 +350,9 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
     class MockStreamReader(object):
         def __init__(self, content):
             self.content = content
+            self.num_retries = 0
 
-        def readfrom(self, start, size):
+        def readfrom(self, start, size, num_retries=0):
             return self.content[start:start+size]
 
     def test_file_stream(self):
@@ -422,7 +424,7 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
         def __init__(self, content, num_retries=0):
             self.content = content
 
-        def get(self, locator):
+        def get(self, locator, num_retries=0):
             return self.content[locator]
 
     def test_stream_reader(self):
@@ -619,5 +621,136 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
                           cwriter.write, "badtext")
 
 
+class CollectionTestMixin(object):
+    PROXY_RESPONSE = {
+        'items_available': 1,
+        'items': [{
+                'uuid': 'zzzzz-bi6l4-mockproxy012345',
+                'owner_uuid': 'zzzzz-tpzed-mockowner012345',
+                'service_host': tutil.TEST_HOST,
+                'service_port': 65535,
+                'service_ssl_flag': True,
+                'service_type': 'proxy',
+                }]}
+    API_COLLECTIONS = run_test_server.fixture('collections')
+    DEFAULT_COLLECTION = API_COLLECTIONS['foo_file']
+    DEFAULT_DATA_HASH = DEFAULT_COLLECTION['portable_data_hash']
+    DEFAULT_MANIFEST = DEFAULT_COLLECTION['manifest_text']
+    DEFAULT_UUID = DEFAULT_COLLECTION['uuid']
+
+    def _mock_api_call(self, mock_method, code, body):
+        mock_method = mock_method().execute
+        if code == 200:
+            mock_method.return_value = body
+        else:
+            mock_method.side_effect = arvados.errors.ApiError(
+                tutil.fake_httplib2_response(code), "{}")
+
+    def mock_keep_services(self, api_mock, code, body):
+        self._mock_api_call(api_mock.keep_services().accessible, code, body)
+
+    def api_client_mock(self, code=200):
+        client = mock.MagicMock(name='api_client')
+        self.mock_keep_services(client, code, self.PROXY_RESPONSE)
+        return client
+
+
+@tutil.skip_sleep
+class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
+    def mock_get_collection(self, api_mock, code, body):
+        body = self.API_COLLECTIONS.get(body)
+        self._mock_api_call(api_mock.collections().get, code, body)
+
+    def api_client_mock(self, code=200):
+        client = super(CollectionReaderTestCase, self).api_client_mock(code)
+        self.mock_get_collection(client, code, 'foo_file')
+        return client
+
+    def test_init_no_default_retries(self):
+        client = self.api_client_mock(200)
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+        reader.manifest_text()
+        client.collections().get().execute.assert_called_with(num_retries=0)
+
+    def test_uuid_init_success(self):
+        client = self.api_client_mock(200)
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client,
+                                          num_retries=3)
+        self.assertEqual(self.DEFAULT_COLLECTION['manifest_text'],
+                         reader.manifest_text())
+        client.collections().get().execute.assert_called_with(num_retries=3)
+
+    def test_uuid_init_failure_raises_api_error(self):
+        client = self.api_client_mock(500)
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+        with self.assertRaises(arvados.errors.ApiError):
+            reader.manifest_text()
+
+    def test_locator_init(self):
+        client = self.api_client_mock(200)
+        # Ensure Keep will not return anything if asked.
+        with tutil.mock_responses(None, 404):
+            reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
+                                              api_client=client)
+            self.assertEqual(self.DEFAULT_MANIFEST, reader.manifest_text())
+
+    def test_locator_init_falls_back_to_keep(self):
+        # Reading manifests from Keep is deprecated.  Feel free to
+        # remove this test when we remove the fallback.
+        client = self.api_client_mock(200)
+        self.mock_get_collection(client, 404, None)
+        with tutil.mock_responses(self.DEFAULT_MANIFEST, 200):
+            reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
+                                              api_client=client, num_retries=3)
+            self.assertEqual(self.DEFAULT_MANIFEST, reader.manifest_text())
+
+    def test_init_num_retries_propagated(self):
+        # More of an integration test...
+        client = self.api_client_mock(200)
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client,
+                                          num_retries=3)
+        with tutil.mock_responses('foo', 500, 500, 200):
+            self.assertEqual('foo',
+                             ''.join(f.read(9) for f in reader.all_files()))
+
+
+@tutil.skip_sleep
+class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
+    def mock_keep(self, body, *codes, **headers):
+        headers.setdefault('x-keep-replicas-stored', 2)
+        return tutil.mock_responses(body, *codes, **headers)
+
+    def foo_writer(self, **kwargs):
+        api_client = self.api_client_mock()
+        writer = arvados.CollectionWriter(api_client, **kwargs)
+        writer.start_new_file('foo')
+        writer.write('foo')
+        return writer
+
+    def test_write_whole_collection(self):
+        writer = self.foo_writer()
+        with self.mock_keep(self.DEFAULT_DATA_HASH, 200, 200):
+            self.assertEqual(self.DEFAULT_DATA_HASH, writer.finish())
+
+    def test_write_no_default(self):
+        writer = self.foo_writer()
+        with self.mock_keep(None, 500):
+            with self.assertRaises(arvados.errors.KeepWriteError):
+                writer.finish()
+
+    def test_write_whole_collection_through_retries(self):
+        writer = self.foo_writer(num_retries=2)
+        with self.mock_keep(self.DEFAULT_DATA_HASH,
+                            500, 500, 200, 500, 500, 200):
+            self.assertEqual(self.DEFAULT_DATA_HASH, writer.finish())
+
+    def test_flush_data_retries(self):
+        writer = self.foo_writer(num_retries=2)
+        foo_hash = self.DEFAULT_MANIFEST.split()[1]
+        with self.mock_keep(foo_hash, 500, 200):
+            writer.flush_data()
+        self.assertEqual(self.DEFAULT_MANIFEST, writer.manifest_text())
+
+
 if __name__ == '__main__':
     unittest.main()