import arvados
import bz2
import copy
+import mock
import os
import pprint
import subprocess
import unittest
import run_test_server
-from arvados_testutil import ArvadosBaseTestCase
+import arvados_testutil as tutil
class TestResumableWriter(arvados.ResumableCollectionWriter):
KEEP_BLOCK_SIZE = 1024 # PUT to Keep every 1K.
class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
- ArvadosBaseTestCase):
+ tutil.ArvadosBaseTestCase):
MAIN_SERVER = {}
@classmethod
class MockStreamReader(object):
def __init__(self, content):
self.content = content
+ self.num_retries = 0
- def readfrom(self, start, size):
+ def readfrom(self, start, size, num_retries=0):
return self.content[start:start+size]
def test_file_stream(self):
class MockKeep(object):
- def __init__(self, content):
+ def __init__(self, content, num_retries=0):
self.content = content
- def get(self, locator):
+ def get(self, locator, num_retries=0):
return self.content[locator]
def test_stream_reader(self):
cwriter.write, "badtext")
+class CollectionTestMixin(object):
+ PROXY_RESPONSE = {
+ 'items_available': 1,
+ 'items': [{
+ 'uuid': 'zzzzz-bi6l4-mockproxy012345',
+ 'owner_uuid': 'zzzzz-tpzed-mockowner012345',
+ 'service_host': tutil.TEST_HOST,
+ 'service_port': 65535,
+ 'service_ssl_flag': True,
+ 'service_type': 'proxy',
+ }]}
+ API_COLLECTIONS = run_test_server.fixture('collections')
+ DEFAULT_COLLECTION = API_COLLECTIONS['foo_file']
+ DEFAULT_DATA_HASH = DEFAULT_COLLECTION['portable_data_hash']
+ DEFAULT_MANIFEST = DEFAULT_COLLECTION['manifest_text']
+ DEFAULT_UUID = DEFAULT_COLLECTION['uuid']
+
+ def _mock_api_call(self, mock_method, code, body):
+ mock_method = mock_method().execute
+ if code == 200:
+ mock_method.return_value = body
+ else:
+ mock_method.side_effect = arvados.errors.ApiError(
+ tutil.fake_httplib2_response(code), "{}")
+
+ def mock_keep_services(self, api_mock, code, body):
+ self._mock_api_call(api_mock.keep_services().accessible, code, body)
+
+ def api_client_mock(self, code=200):
+ client = mock.MagicMock(name='api_client')
+ self.mock_keep_services(client, code, self.PROXY_RESPONSE)
+ return client
+
+
+@tutil.skip_sleep
+class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
+ def mock_get_collection(self, api_mock, code, body):
+ body = self.API_COLLECTIONS.get(body)
+ self._mock_api_call(api_mock.collections().get, code, body)
+
+ def api_client_mock(self, code=200):
+ client = super(CollectionReaderTestCase, self).api_client_mock(code)
+ self.mock_get_collection(client, code, 'foo_file')
+ return client
+
+ def test_init_no_default_retries(self):
+ client = self.api_client_mock(200)
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+ reader.manifest_text()
+ client.collections().get().execute.assert_called_with(num_retries=0)
+
+ def test_uuid_init_success(self):
+ client = self.api_client_mock(200)
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client,
+ num_retries=3)
+ self.assertEqual(self.DEFAULT_COLLECTION['manifest_text'],
+ reader.manifest_text())
+ client.collections().get().execute.assert_called_with(num_retries=3)
+
+ def test_uuid_init_failure_raises_api_error(self):
+ client = self.api_client_mock(500)
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+ with self.assertRaises(arvados.errors.ApiError):
+ reader.manifest_text()
+
+ def test_locator_init(self):
+ client = self.api_client_mock(200)
+ # Ensure Keep will not return anything if asked.
+ with tutil.mock_responses(None, 404):
+ reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
+ api_client=client)
+ self.assertEqual(self.DEFAULT_MANIFEST, reader.manifest_text())
+
+ def test_locator_init_falls_back_to_keep(self):
+ # Reading manifests from Keep is deprecated. Feel free to
+ # remove this test when we remove the fallback.
+ client = self.api_client_mock(200)
+ self.mock_get_collection(client, 404, None)
+ with tutil.mock_responses(self.DEFAULT_MANIFEST, 200):
+ reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
+ api_client=client, num_retries=3)
+ self.assertEqual(self.DEFAULT_MANIFEST, reader.manifest_text())
+
+ def test_init_num_retries_propagated(self):
+ # More of an integration test...
+ client = self.api_client_mock(200)
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client,
+ num_retries=3)
+ with tutil.mock_responses('foo', 500, 500, 200):
+ self.assertEqual('foo',
+ ''.join(f.read(9) for f in reader.all_files()))
+
+
+@tutil.skip_sleep
+class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
+ def mock_keep(self, body, *codes, **headers):
+ headers.setdefault('x-keep-replicas-stored', 2)
+ return tutil.mock_responses(body, *codes, **headers)
+
+ def foo_writer(self, **kwargs):
+ api_client = self.api_client_mock()
+ writer = arvados.CollectionWriter(api_client, **kwargs)
+ writer.start_new_file('foo')
+ writer.write('foo')
+ return writer
+
+ def test_write_whole_collection(self):
+ writer = self.foo_writer()
+ with self.mock_keep(self.DEFAULT_DATA_HASH, 200, 200):
+ self.assertEqual(self.DEFAULT_DATA_HASH, writer.finish())
+
+ def test_write_no_default(self):
+ writer = self.foo_writer()
+ with self.mock_keep(None, 500):
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ writer.finish()
+
+ def test_write_whole_collection_through_retries(self):
+ writer = self.foo_writer(num_retries=2)
+ with self.mock_keep(self.DEFAULT_DATA_HASH,
+ 500, 500, 200, 500, 500, 200):
+ self.assertEqual(self.DEFAULT_DATA_HASH, writer.finish())
+
+ def test_flush_data_retries(self):
+ writer = self.foo_writer(num_retries=2)
+ foo_hash = self.DEFAULT_MANIFEST.split()[1]
+ with self.mock_keep(foo_hash, 500, 200):
+ writer.flush_data()
+ self.assertEqual(self.DEFAULT_MANIFEST, writer.manifest_text())
+
+
if __name__ == '__main__':
unittest.main()