class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
ArvadosBaseTestCase):
def _getKeepServerConfig():
- for config_file in ['application.yml', 'application.default.yml']:
- with open(os.path.join(run_test_server.SERVICES_SRC_DIR,
- "api", "config", config_file)) as f:
+ for config_file, mandatory in [
+ ['application.yml', True], ['application.default.yml', False]]:
+ path = os.path.join(run_test_server.SERVICES_SRC_DIR,
+ "api", "config", config_file)
+ if not mandatory and not os.path.exists(path):
+ continue
+ with open(path) as f:
rails_config = yaml.load(f.read())
for config_section in ['test', 'common']:
try:
"ARVADOS_API_HOST_INSECURE",
"ARVADOS_API_TOKEN"]:
self.ENVIRON[v] = arvados.config.settings()[v]
- arv_put.api_client = arvados.api('v1', cache=False)
+ arv_put.api_client = arvados.api('v1')
def current_user(self):
return arv_put.api_client.users().current().execute()
def test_check_real_project_found(self):
self.authorize_with('active')
- self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID),
+ self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
"did not correctly find test fixture project")
def test_check_error_finding_nonexistent_uuid(self):
BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
self.authorize_with('active')
try:
- result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID)
+ result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
+ 0)
except ValueError as error:
self.assertIn(BAD_UUID, error.message)
else:
BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
self.authorize_with('active')
with self.assertRaises(apiclient.errors.HttpError):
- result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID)
+ result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
+ 0)
def test_short_put_from_stdin(self):
# Have to run this as an integration test since arv-put can't
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=self.ENVIRON)
stdout, stderr = pipe.communicate(text)
- collection_list = arvados.api('v1', cache=False).collections().list(
- filters=[['portable_data_hash', '=', stdout.strip()]]).execute().get('items', [])
+ search_key = ('portable_data_hash'
+ if '--portable-data-hash' in extra_args else 'uuid')
+ collection_list = arvados.api('v1').collections().list(
+ filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
self.assertEqual(1, len(collection_list))
return collection_list[0]
+ def test_put_collection_with_high_redundancy(self):
+ # Write empty data: we're not testing CollectionWriter, just
+ # making sure collections.create tells the API server what our
+ # desired replication level is.
+ collection = self.run_and_find_collection("", ['--replication', '4'])
+ self.assertEqual(4, collection['replication_desired'])
+
+ def test_put_collection_with_default_redundancy(self):
+ collection = self.run_and_find_collection("")
+ self.assertEqual(None, collection['replication_desired'])
+
def test_put_collection_with_unnamed_project_link(self):
- link = self.run_and_find_collection("Test unnamed collection",
- ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
+ link = self.run_and_find_collection(
+ "Test unnamed collection",
+ ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
username = pwd.getpwuid(os.getuid()).pw_name
self.assertRegexpMatches(
link['name'],
def test_put_collection_with_name_and_no_project(self):
link_name = 'Test Collection Link in home project'
- collection = self.run_and_find_collection("Test named collection in home project",
- ['--portable-data-hash', '--name', link_name])
+ collection = self.run_and_find_collection(
+ "Test named collection in home project",
+ ['--portable-data-hash', '--name', link_name])
self.assertEqual(link_name, collection['name'])
my_user_uuid = self.current_user()['uuid']
self.assertEqual(my_user_uuid, collection['owner_uuid'])