Merge branch '16377-missing-default' refs #16377
[arvados.git] / sdk / python / tests / fed-migrate / check.py
index 8f494be2fb4448ed3135c05328b12f3d398f55a1..c231cc0735795cae9577f9e52f7e5f4bae449bb3 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 import arvados
 import json
 import sys
@@ -5,47 +9,92 @@ import sys
 j = json.load(open(sys.argv[1]))
 
 apiA = arvados.api(host=j["arvados_api_hosts"][0], token=j["superuser_tokens"][0], insecure=True)
-apiB = arvados.api(host=j["arvados_api_hosts"][1], token=j["superuser_tokens"][1], insecure=True)
-apiC = arvados.api(host=j["arvados_api_hosts"][2], token=j["superuser_tokens"][2], insecure=True)
-
-users = apiA.users().list().execute()
+tok = apiA.api_client_authorizations().current().execute()
+v2_token = "v2/%s/%s" % (tok["uuid"], tok["api_token"])
 
-assert len(users["items"]) == 11
+apiB = arvados.api(host=j["arvados_api_hosts"][1], token=v2_token, insecure=True)
+apiC = arvados.api(host=j["arvados_api_hosts"][2], token=v2_token, insecure=True)
 
+###
+### Check users on API server "A" (the LoginCluster) ###
+###
 by_username = {}
+def check_A(users):
+    assert len(users["items"]) == 11
+
+    for i in range(1, 10):
+        found = False
+        for u in users["items"]:
+            if u["username"] == ("case%d" % i) and u["email"] == ("case%d@test" % i) and u["first_name"] == ("Case%d" % i) and u["last_name"] == "Testuser":
+                found = True
+                by_username[u["username"]] = u["uuid"]
+        assert found
+
+    # Should be active
+    for i in (1, 2, 3, 4, 5, 6, 7, 8):
+        found = False
+        for u in users["items"]:
+            if u["username"] == ("case%d" % i) and u["email"] == ("case%d@test" % i) and u["is_active"] is True:
+                found = True
+        assert found, "Not found case%i" % i
 
-for i in range(1, 10):
+    # case9 should not be active
     found = False
     for u in users["items"]:
-        if u["username"] == ("case%d" % i) and u["email"] == ("case%d@test" % i):
+        if (u["username"] == "case9" and u["email"] == "case9@test" and
+            u["uuid"] == by_username[u["username"]] and u["is_active"] is False):
             found = True
-            by_username[u["username"]] = u["uuid"]
     assert found
 
+users = apiA.users().list().execute()
+check_A(users)
+
+users = apiA.users().list(bypass_federation=True).execute()
+check_A(users)
+
+###
+### Check users on API server "B" (federation member) ###
+###
+
+# check for expected migrations on B
+users = apiB.users().list(bypass_federation=True).execute()
+assert len(users["items"]) == 11
+
+for i in range(2, 9):
+    found = False
+    for u in users["items"]:
+        if (u["username"] == ("case%d" % i) and u["email"] == ("case%d@test" % i) and
+            u["first_name"] == ("Case%d" % i) and u["last_name"] == "Testuser" and
+            u["uuid"] == by_username[u["username"]] and u["is_active"] is True):
+            found = True
+    assert found, "Not found case%i" % i
+
 found = False
 for u in users["items"]:
     if (u["username"] == "case9" and u["email"] == "case9@test" and
+        u["first_name"] == "Case9" and u["last_name"] == "Testuser" and
         u["uuid"] == by_username[u["username"]] and u["is_active"] is False):
         found = True
 assert found
 
+# check that federated user listing works
 users = apiB.users().list().execute()
-assert len(users["items"]) == 11
+check_A(users)
 
-for i in range(2, 10):
-    found = False
-    for u in users["items"]:
-        if u["username"] == ("case%d" % i) and u["email"] == ("case%d@test" % i) and u["uuid"] == by_username[u["username"]]:
-            found = True
-    assert found
+###
+### Check users on API server "C" (federation member) ###
+###
 
-users = apiC.users().list().execute()
+# check for expected migrations on C
+users = apiC.users().list(bypass_federation=True).execute()
 assert len(users["items"]) == 8
 
 for i in (2, 4, 6, 7, 8):
     found = False
     for u in users["items"]:
-        if u["username"] == ("case%d" % i) and u["email"] == ("case%d@test" % i) and u["uuid"] == by_username[u["username"]]:
+        if (u["username"] == ("case%d" % i) and u["email"] == ("case%d@test" % i) and
+            u["first_name"] == ("Case%d" % i) and u["last_name"] == "Testuser" and
+            u["uuid"] == by_username[u["username"]] and u["is_active"] is True):
             found = True
     assert found
 
@@ -54,8 +103,14 @@ for i in (2, 4, 6, 7, 8):
 for i in (3, 5, 9):
     found = False
     for u in users["items"]:
-        if u["username"] == ("case%d" % i) and u["email"] == ("case%d@test" % i) and u["uuid"] == by_username[u["username"]]:
+        if (u["username"] == ("case%d" % i) and u["email"] == ("case%d@test" % i) and
+            u["first_name"] == ("Case%d" % i) and u["last_name"] == "Testuser" and
+            u["uuid"] == by_username[u["username"]] and u["is_active"] is True):
             found = True
     assert not found
 
+# check that federated user listing works
+users = apiC.users().list().execute()
+check_A(users)
+
 print("Passed checks")