16263: User migration test also checks federated user behavior
authorPeter Amstutz <peter.amstutz@curii.com>
Thu, 2 Apr 2020 14:52:44 +0000 (10:52 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Mon, 13 Apr 2020 15:27:36 +0000 (11:27 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/python/tests/fed-migrate/check.py

index 8165b3eba73dd3ef4adf04083b039bc67bbb26c0..a2c0096165c74b7bc1fda0daf212177cb4d08ac2 100644 (file)
@@ -5,47 +5,54 @@ import sys
 j = json.load(open(sys.argv[1]))
 
 apiA = arvados.api(host=j["arvados_api_hosts"][0], token=j["superuser_tokens"][0], insecure=True)
-apiB = arvados.api(host=j["arvados_api_hosts"][1], token=j["superuser_tokens"][1], insecure=True)
-apiC = arvados.api(host=j["arvados_api_hosts"][2], token=j["superuser_tokens"][2], insecure=True)
+tok = apiA.api_client_authorizations().current().execute()
+v2_token = "v2/%s/%s" % (tok["uuid"], tok["api_token"])
+
+apiB = arvados.api(host=j["arvados_api_hosts"][1], token=v2_token, insecure=True)
+apiC = arvados.api(host=j["arvados_api_hosts"][2], token=v2_token, insecure=True)
 
 ###
 ### Check users on API server "A" (the LoginCluster) ###
 ###
-
-users = apiA.users().list(bypass_federation=True).execute()
-
-assert len(users["items"]) == 11
-
 by_username = {}
-
-for i in range(1, 10):
+def check_A(users):
+    assert len(users["items"]) == 11
+
+    for i in range(1, 10):
+        found = False
+        for u in users["items"]:
+            if u["username"] == ("case%d" % i) and u["email"] == ("case%d@test" % i):
+                found = True
+                by_username[u["username"]] = u["uuid"]
+        assert found
+
+    # Should be active
+    for i in (1, 2, 3, 4, 5, 6, 7, 8):
+        found = False
+        for u in users["items"]:
+            if u["username"] == ("case%d" % i) and u["email"] == ("case%d@test" % i) and u["is_active"] is True:
+                found = True
+        assert found, "Not found case%i" % i
+
+    # case9 should not be active
     found = False
     for u in users["items"]:
-        if u["username"] == ("case%d" % i) and u["email"] == ("case%d@test" % i):
+        if (u["username"] == "case9" and u["email"] == "case9@test" and
+            u["uuid"] == by_username[u["username"]] and u["is_active"] is False):
             found = True
-            by_username[u["username"]] = u["uuid"]
     assert found
 
-# Should be active
-for i in (1, 2, 3, 4, 5, 6, 7, 8):
-    found = False
-    for u in users["items"]:
-        if u["username"] == ("case%d" % i) and u["email"] == ("case%d@test" % i) and u["is_active"] is True:
-            found = True
-    assert found, "Not found case%i" % i
-
-# case9 should not be active
-found = False
-for u in users["items"]:
-    if (u["username"] == "case9" and u["email"] == "case9@test" and
-        u["uuid"] == by_username[u["username"]] and u["is_active"] is False):
-        found = True
-assert found
+users = apiA.users().list().execute()
+check_A(users)
 
+users = apiA.users().list(bypass_federation=True).execute()
+check_A(users)
 
 ###
 ### Check users on API server "B" (federation member) ###
 ###
+
+# check for expected migrations on B
 users = apiB.users().list(bypass_federation=True).execute()
 assert len(users["items"]) == 11
 
@@ -64,10 +71,15 @@ for u in users["items"]:
         found = True
 assert found
 
+# check that federated user listing works
+users = apiB.users().list().execute()
+check_A(users)
 
 ###
 ### Check users on API server "C" (federation member) ###
 ###
+
+# check for expected migrations on C
 users = apiC.users().list(bypass_federation=True).execute()
 assert len(users["items"]) == 8
 
@@ -89,4 +101,8 @@ for i in (3, 5, 9):
             found = True
     assert not found
 
+# check that federated user listing works
+users = apiC.users().list().execute()
+check_A(users)
+
 print("Passed checks")