+
+class StorageClassesTest(IntegrationTest):
+ mnt_args = [
+ '--read-write',
+ '--mount-home', 'homedir',
+ ]
+
+ def setUp(self):
+ super(StorageClassesTest, self).setUp()
+ self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
+
+ @IntegrationTest.mount(argv=mnt_args)
+ def test_collection_default_storage_classes(self):
+ coll_path = os.path.join(self.mnt, 'homedir', 'a_collection')
+ self.api.collections().create(body={'name':'a_collection'}).execute()
+ self.pool_test(coll_path)
+ @staticmethod
+ def _test_collection_default_storage_classes(self, coll):
+ self.assertEqual(storage_classes_desired(coll), ['default'])
+
+ @IntegrationTest.mount(argv=mnt_args+['--storage-classes', 'foo'])
+ def test_collection_custom_storage_classes(self):
+ coll_path = os.path.join(self.mnt, 'homedir', 'new_coll')
+ os.mkdir(coll_path)
+ self.pool_test(coll_path)
+ @staticmethod
+ def _test_collection_custom_storage_classes(self, coll):
+ self.assertEqual(storage_classes_desired(coll), ['foo'])
+
+def _readonlyCollectionTestHelper(mounttmp):
+ f = open(os.path.join(mounttmp, 'thing1.txt'), 'rt')
+ # Testing that close() doesn't raise an error.
+ f.close()
+
+class ReadonlyCollectionTest(MountTestBase):
+ def setUp(self):
+ super(ReadonlyCollectionTest, self).setUp()
+ cw = arvados.collection.Collection()
+ with cw.open('thing1.txt', 'wt') as f:
+ f.write("data 1")
+ cw.save_new(owner_uuid=run_test_server.fixture("groups")["aproject"]["uuid"])
+ self.testcollection = cw.api_response()
+
+ def runTest(self):
+ settings = arvados.config.settings().copy()
+ settings["ARVADOS_API_TOKEN"] = run_test_server.fixture("api_client_authorizations")["project_viewer"]["api_token"]
+ self.api = arvados.safeapi.ThreadSafeApiCache(settings)
+ self.make_mount(fuse.CollectionDirectory, collection_record=self.testcollection, enable_write=False)
+
+ self.pool.apply(_readonlyCollectionTestHelper, (self.mounttmp,))