5 j = json.load(open(sys.argv[1]))
7 apiA = arvados.api(host=j["arvados_api_hosts"][0], token=j["superuser_tokens"][0], insecure=True)
8 tok = apiA.api_client_authorizations().current().execute()
9 v2_token = "v2/%s/%s" % (tok["uuid"], tok["api_token"])
11 apiB = arvados.api(host=j["arvados_api_hosts"][1], token=v2_token, insecure=True)
12 apiC = arvados.api(host=j["arvados_api_hosts"][2], token=v2_token, insecure=True)
15 ### Check users on API server "A" (the LoginCluster) ###
19 assert len(users["items"]) == 11
21 for i in range(1, 10):
23 for u in users["items"]:
24 if u["username"] == ("case%d" % i) and u["email"] == ("case%d@test" % i):
26 by_username[u["username"]] = u["uuid"]
30 for i in (1, 2, 3, 4, 5, 6, 7, 8):
32 for u in users["items"]:
33 if u["username"] == ("case%d" % i) and u["email"] == ("case%d@test" % i) and u["is_active"] is True:
35 assert found, "Not found case%i" % i
37 # case9 should not be active
39 for u in users["items"]:
40 if (u["username"] == "case9" and u["email"] == "case9@test" and
41 u["uuid"] == by_username[u["username"]] and u["is_active"] is False):
45 users = apiA.users().list().execute()
48 users = apiA.users().list(bypass_federation=True).execute()
52 ### Check users on API server "B" (federation member) ###
55 # check for expected migrations on B
56 users = apiB.users().list(bypass_federation=True).execute()
57 assert len(users["items"]) == 11
61 for u in users["items"]:
62 if (u["username"] == ("case%d" % i) and u["email"] == ("case%d@test" % i) and
63 u["uuid"] == by_username[u["username"]] and u["is_active"] is True):
65 assert found, "Not found case%i" % i
68 for u in users["items"]:
69 if (u["username"] == "case9" and u["email"] == "case9@test" and
70 u["uuid"] == by_username[u["username"]] and u["is_active"] is False):
74 # check that federated user listing works
75 users = apiB.users().list().execute()
79 ### Check users on API server "C" (federation member) ###
82 # check for expected migrations on C
83 users = apiC.users().list(bypass_federation=True).execute()
84 assert len(users["items"]) == 8
86 for i in (2, 4, 6, 7, 8):
88 for u in users["items"]:
89 if (u["username"] == ("case%d" % i) and u["email"] == ("case%d@test" % i) and
90 u["uuid"] == by_username[u["username"]] and u["is_active"] is True):
94 # cases 3, 5, 9 involve users that have never accessed cluster C so
95 # there's nothing to migrate.
98 for u in users["items"]:
99 if (u["username"] == ("case%d" % i) and u["email"] == ("case%d@test" % i) and
100 u["uuid"] == by_username[u["username"]] and u["is_active"] is True):
104 # check that federated user listing works
105 users = apiC.users().list().execute()
108 print("Passed checks")