Merge branch '21535-multi-wf-delete'
[arvados.git] / sdk / python / arvados / commands / federation_migrate.py
index 829d0d8990c206b3ede7d03ae6434423f32ddfc6..770e1609db6ec60dc39567678986c53f3a6f5a35 100755 (executable)
 #
 # SPDX-License-Identifier: Apache-2.0
 
+#
+# Migration tool for merging user accounts belonging to the same user
+# but on separate clusters to use a single user account managed by a
+# specific cluster.
+#
+# If you're working on this, see
+# arvados/sdk/python/tests/fed-migrate/README for information about
+# the testing infrastructure.
+
 import arvados
 import arvados.util
+import arvados.errors
 import csv
 import sys
 import argparse
 import hmac
+import urllib.parse
+import os
+import hashlib
+import re
+from arvados._version import __version__
+from . import _util as arv_cmd
 
-def main():
-
-    parser = argparse.ArgumentParser(description='Migrate users to federated identity, see https://doc.arvados.org/admin/merge-remote-account.html')
-    parser.add_argument('--tokens', type=str, required=True)
-    group = parser.add_mutually_exclusive_group(required=True)
-    group.add_argument('--report', type=str, help="Generate report .csv file listing users by email address and their associated Arvados accounts")
-    group.add_argument('--migrate', type=str, help="Consume report .csv and migrate users to designated Arvados accounts")
-    group.add_argument('--check', action="store_true", help="Check that tokens are usable and the federation is well connected")
-    args = parser.parse_args()
+EMAIL=0
+USERNAME=1
+UUID=2
+HOMECLUSTER=3
 
+def connect_clusters(args):
     clusters = {}
+    errors = []
+    loginCluster = None
+    if args.tokens:
+        print("Reading %s" % args.tokens)
+        with open(args.tokens, "rt") as f:
+            for r in csv.reader(f):
+                if len(r) != 2:
+                    continue
+                host = r[0]
+                token = r[1]
+                print("Contacting %s" % (host))
+                arv = arvados.api(host=host, token=token, cache=False, num_retries=args.retries)
+                clusters[arv._rootDesc["uuidPrefix"]] = arv
+    else:
+        arv = arvados.api(cache=False, num_retries=args.retries)
+        rh = arv._rootDesc["remoteHosts"]
+        tok = arv.api_client_authorizations().current().execute()
+        token = "v2/%s/%s" % (tok["uuid"], tok["api_token"])
 
-    print("Reading %s" % args.tokens)
-    with open(args.tokens, "rt") as f:
-        for r in csv.reader(f):
-            host = r[0]
-            token = r[1]
-            print("Contacting %s" % (host))
-            arv = arvados.api(host=host, token=token)
-            clusters[arv._rootDesc["uuidPrefix"]] = arv
-            cur = arv.users().current().execute()
-            if not cur["is_admin"]:
-                raise Exception("Not admin of %s" % host)
+        for k,v in rh.items():
+            arv = arvados.api(host=v, token=token, cache=False, insecure=os.environ.get("ARVADOS_API_HOST_INSECURE"))
+            clusters[k] = arv
+
+    for _, arv in clusters.items():
+        config = arv.configs().get().execute()
+        if config["Login"]["LoginCluster"] != "" and loginCluster is None:
+            loginCluster = config["Login"]["LoginCluster"]
 
     print("Checking that the federation is well connected")
-    fail = False
-    for v in clusters.values():
+    for arv in clusters.values():
+        config = arv.configs().get().execute()
+        if loginCluster and config["Login"]["LoginCluster"] != loginCluster and config["ClusterID"] != loginCluster:
+            errors.append("Inconsistent login cluster configuration, expected '%s' on %s but was '%s'" % (loginCluster, config["ClusterID"], config["Login"]["LoginCluster"]))
+            continue
+
+        if arv._rootDesc["revision"] < "20200331":
+            errors.append("Arvados API server revision on cluster '%s' is too old, must be updated to at least Arvados 2.0.2 before running migration." % config["ClusterID"])
+            continue
+
+        try:
+            cur = arv.users().current().execute()
+        except arvados.errors.ApiError as e:
+            errors.append("checking token for %s   %s" % (arv._rootDesc["rootUrl"], e))
+            continue
+
+        if not cur["is_admin"]:
+            errors.append("User %s is not admin on %s" % (cur["uuid"], arv._rootDesc["uuidPrefix"]))
+            continue
+
         for r in clusters:
-            if r != v._rootDesc["uuidPrefix"] and r not in v._rootDesc["remoteHosts"]:
-                print("%s is missing from remoteHosts on %s" % (r, v._rootDesc["uuidPrefix"]))
-                fail = True
+            if r != arv._rootDesc["uuidPrefix"] and r not in arv._rootDesc["remoteHosts"]:
+                errors.append("%s is missing from remoteHosts of %s" % (r, arv._rootDesc["uuidPrefix"]))
+        for r in arv._rootDesc["remoteHosts"]:
+            if r != "*" and r not in clusters:
+                print("WARNING: %s is federated with %s but %s is missing from the tokens file or the token is invalid" % (arv._rootDesc["uuidPrefix"], r, r))
 
-    if fail:
-        exit(1)
+    return clusters, errors, loginCluster
 
-    if args.check:
-        exit(0)
 
-    if args.report:
-        users = []
-        for c, arv in clusters.items():
-            print("Getting user list from %s" % c)
-            ul = arvados.util.list_all(arv.users().list)
-            for l in ul:
-                if l["uuid"].startswith(c):
-                    users.append(l)
+def fetch_users(clusters, loginCluster):
+    rows = []
+    by_email = {}
+    by_username = {}
 
-        out = csv.writer(open(args.report, "wt"))
+    users = [
+        user
+        for prefix, arv in clusters.items()
+        for user in arvados.util.keyset_list_all(arv.users().list, bypass_federation=True)
+        if user['uuid'].startswith(prefix)
+    ]
 
-        out.writerow(("email", "user uuid", "primary cluster/user"))
+    # Users list is sorted by email
+    # Go through users and collect users with same email
+    # when we see a different email (or get to the end)
+    # call add_accum_rows() to generate the report rows with
+    # the "home cluster" set, and also fill in the by_email table.
 
-        users = sorted(users, key=lambda u: u["email"]+"::"+u["uuid"])
+    users.sort(key=lambda u: (u["email"], u["username"] or "", u["uuid"]))
 
-        accum = []
-        lastemail = None
-        for u in users:
-            if u["uuid"].endswith("-anonymouspublic") or u["uuid"].endswith("-000000000000000"):
-                continue
-            if lastemail == None:
-                lastemail = u["email"]
-            if u["email"] == lastemail:
-                accum.append(u)
-            else:
-                homeuuid = None
-                for a in accum:
-                    if homeuuid is None:
-                        homeuuid = a["uuid"]
-                    if a["uuid"] != homeuuid:
-                        homeuuid = ""
-                for a in accum:
-                    out.writerow((a["email"], a["uuid"], homeuuid[0:5]))
-                lastemail = u["email"]
-                accum = [u]
+    accum = []
+    lastemail = None
 
+    def add_accum_rows():
         homeuuid = None
         for a in accum:
-            if homeuuid is None:
-                homeuuid = a["uuid"]
-            if a["uuid"] != homeuuid:
-                homeuuid = ""
+            uuids = set(a["uuid"] for a in accum)
+            homeuuid = ((len(uuids) == 1) and uuids.pop()) or ""
         for a in accum:
-            out.writerow((a["email"], a["uuid"], homeuuid[0:5]))
+            r = (a["email"], a["username"], a["uuid"], loginCluster or homeuuid[0:5])
+            by_email.setdefault(a["email"], {})
+            by_email[a["email"]][a["uuid"]] = r
+            homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], a["username"])
+            if homeuuid_and_username not in by_username:
+                by_username[homeuuid_and_username] = a["email"]
+            elif by_username[homeuuid_and_username] != a["email"]:
+                print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
+                exit(1)
+            rows.append(r)
 
-        print("Wrote %s" % args.report)
+    for u in users:
+        if u["uuid"].endswith("-anonymouspublic") or u["uuid"].endswith("-000000000000000"):
+            continue
+        if lastemail == None:
+            lastemail = u["email"]
+        if u["email"] == lastemail:
+            accum.append(u)
+        else:
+            add_accum_rows()
+            lastemail = u["email"]
+            accum = [u]
 
-    if args.migrate:
-        rows = []
-        by_email = {}
-        with open(args.migrate, "rt") as f:
-            for r in csv.reader(f):
-                if r[0] == "email":
-                    continue
-                by_email.setdefault(r[0], [])
-                by_email[r[0]].append(r)
-                rows.append(r)
-        for r in rows:
-            if r[2] == "":
-                print("(%s) Skipping %s, no home cluster specified" % (r[0], r[1]))
-            if r[1].startswith(r[2]):
-                continue
-            candidates = []
-            for b in by_email[r[0]]:
-                if b[1].startswith(r[2]):
-                    candidates.append(b)
-            if len(candidates) == 0:
-                print("(%s) No user listed to migrate %s to %s" % (r[0], r[1], r[2]))
-                continue
-            if len(candidates) > 1:
-                print("(%s) Multiple users listed to migrate %s to %s, use full uuid" % (r[0], r[1], r[2]))
+    add_accum_rows()
+
+    return rows, by_email, by_username
+
+
+def read_migrations(args, by_email, by_username):
+    rows = []
+    with open(args.migrate or args.dry_run, "rt") as f:
+        for r in csv.reader(f):
+            if r[EMAIL] == "email":
                 continue
-            new_user_uuid = candidates[0][1]
-            print("(%s) Migrating %s to %s" % (r[0], r[1], new_user_uuid))
-            oldcluster = r[1][0:5]
-            newhomecluster = r[2][0:5]
-            homearv = clusters[newhomecluster]
-            # create a token
-            newtok = homearv.api_client_authorizations().create(body={"api_client_authorization": {'owner_uuid': new_user_uuid}}).execute()
-            salted = 'v2/' + newtok["uuid"] + '/' + hmac.new(newtok["api_token"].encode(), msg=oldcluster.encode(), digestmod='sha1').hexdigest()
-            arvados.api(host=arv._rootDesc["rootUrl"][8:-1], token=salted).users().current().execute()
-
-            # now migrate from local user to remote user.
-            arv = clusters[oldcluster]
-
-            grp = arv.groups().create(body={
+            by_email.setdefault(r[EMAIL], {})
+            by_email[r[EMAIL]][r[UUID]] = r
+
+            homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], r[USERNAME])
+            if homeuuid_and_username not in by_username:
+                by_username[homeuuid_and_username] = r[EMAIL]
+            elif by_username[homeuuid_and_username] != r[EMAIL]:
+                print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
+                exit(1)
+
+            rows.append(r)
+    return rows
+
+def update_username(args, email, user_uuid, username, migratecluster, migratearv):
+    print("(%s) Updating username of %s to '%s' on %s" % (email, user_uuid, username, migratecluster))
+    if args.dry_run:
+        return
+    try:
+        conflicts = migratearv.users().list(filters=[["username", "=", username]], bypass_federation=True).execute()
+        if conflicts["items"]:
+            # There's already a user with the username, move the old user out of the way
+            migratearv.users().update(uuid=conflicts["items"][0]["uuid"],
+                                        bypass_federation=True,
+                                        body={"user": {"username": username+"migrate"}}).execute()
+        migratearv.users().update(uuid=user_uuid,
+                                    bypass_federation=True,
+                                    body={"user": {"username": username}}).execute()
+    except arvados.errors.ApiError as e:
+        print("(%s) Error updating username of %s to '%s' on %s: %s" % (email, user_uuid, username, migratecluster, e))
+
+
+def choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters):
+    candidates = []
+    conflict = False
+    for b in by_email[email].values():
+        if b[2].startswith(userhome):
+            candidates.append(b)
+        if b[1] != username and b[3] == userhome:
+            print("(%s) Cannot migrate %s, conflicting usernames %s and %s" % (email, old_user_uuid, b[1], username))
+            conflict = True
+            break
+    if conflict:
+        return None
+    if len(candidates) == 0:
+        if len(userhome) == 5 and userhome not in clusters:
+            print("(%s) Cannot migrate %s, unknown home cluster %s (typo?)" % (email, old_user_uuid, userhome))
+            return None
+        print("(%s) No user listed with same email to migrate %s to %s, will create new user with username '%s'" % (email, old_user_uuid, userhome, username))
+        if not args.dry_run:
+            oldhomecluster = old_user_uuid[0:5]
+            oldhomearv = clusters[oldhomecluster]
+            newhomecluster = userhome[0:5]
+            homearv = clusters[userhome]
+            user = None
+            try:
+                olduser = oldhomearv.users().get(uuid=old_user_uuid).execute()
+                conflicts = homearv.users().list(filters=[["username", "=", username]],
+                                                 bypass_federation=True).execute()
+                if conflicts["items"]:
+                    homearv.users().update(
+                        uuid=conflicts["items"][0]["uuid"],
+                        bypass_federation=True,
+                        body={"user": {"username": username+"migrate"}}).execute()
+                user = homearv.users().create(
+                    body={"user": {
+                        "email": email,
+                        "first_name": olduser["first_name"],
+                        "last_name": olduser["last_name"],
+                        "username": username,
+                        "is_active": olduser["is_active"]}}).execute()
+            except arvados.errors.ApiError as e:
+                print("(%s) Could not create user: %s" % (email, str(e)))
+                return None
+
+            tup = (email, username, user["uuid"], userhome)
+        else:
+            # dry run
+            tup = (email, username, "%s-tpzed-xfakexfakexfake" % (userhome[0:5]), userhome)
+        by_email[email][tup[2]] = tup
+        candidates.append(tup)
+    if len(candidates) > 1:
+        print("(%s) Multiple users listed to migrate %s to %s, use full uuid" % (email, old_user_uuid, userhome))
+        return None
+    return candidates[0][2]
+
+
+def activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid):
+    # create a token for the new user and salt it for the
+    # migration cluster, then use it to access the migration
+    # cluster as the new user once before merging to ensure
+    # the new user is known on that cluster.
+    migratecluster = migratearv._rootDesc["uuidPrefix"]
+    try:
+        if not args.dry_run:
+            newtok = homearv.api_client_authorizations().create(body={
+                "api_client_authorization": {'owner_uuid': new_user_uuid}}).execute()
+        else:
+            newtok = {"uuid": "dry-run", "api_token": "12345"}
+    except arvados.errors.ApiError as e:
+        print("(%s) Could not create API token for %s: %s" % (email, new_user_uuid, e))
+        return None
+
+    try:
+        findolduser = migratearv.users().list(filters=[["uuid", "=", old_user_uuid]], bypass_federation=True).execute()
+        if len(findolduser["items"]) == 0:
+            return False
+        if len(findolduser["items"]) == 1:
+            olduser = findolduser["items"][0]
+        else:
+            print("(%s) Unexpected result" % (email))
+            return None
+    except arvados.errors.ApiError as e:
+        print("(%s) Could not retrieve user %s from %s, user may have already been migrated: %s" % (email, old_user_uuid, migratecluster, e))
+        return None
+
+    salted = 'v2/' + newtok["uuid"] + '/' + hmac.new(newtok["api_token"].encode(),
+                                                     msg=migratecluster.encode(),
+                                                     digestmod=hashlib.sha1).hexdigest()
+    try:
+        ru = urllib.parse.urlparse(migratearv._rootDesc["rootUrl"])
+        if not args.dry_run:
+            newuser = arvados.api(host=ru.netloc, token=salted,
+                                  insecure=os.environ.get("ARVADOS_API_HOST_INSECURE")).users().current().execute()
+        else:
+            newuser = {"is_active": True, "username": email.split('@')[0], "is_admin": False}
+    except arvados.errors.ApiError as e:
+        print("(%s) Error getting user info for %s from %s: %s" % (email, new_user_uuid, migratecluster, e))
+        return None
+
+    if not newuser["is_active"] and olduser["is_active"]:
+        print("(%s) Activating user %s on %s" % (email, new_user_uuid, migratecluster))
+        try:
+            if not args.dry_run:
+                migratearv.users().update(uuid=new_user_uuid, bypass_federation=True,
+                                          body={"is_active": True}).execute()
+        except arvados.errors.ApiError as e:
+            print("(%s) Could not activate user %s on %s: %s" % (email, new_user_uuid, migratecluster, e))
+            return None
+
+    if olduser["is_admin"] and not newuser["is_admin"]:
+        print("(%s) Not migrating %s because user is admin but target user %s is not admin on %s. Please ensure the user admin status is the same on both clusters. Note that a federated admin account has admin privileges on the entire federation." % (email, old_user_uuid, new_user_uuid, migratecluster))
+        return None
+
+    return newuser
+
+def migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid):
+    if args.dry_run:
+        return
+    try:
+        new_owner_uuid = new_user_uuid
+        if args.data_into_subproject:
+            grp = migratearv.groups().create(body={
                 "owner_uuid": new_user_uuid,
-                "name": "Migrated from %s (%s)" % (r[0], r[1]),
+                "name": "Migrated from %s (%s)" % (email, old_user_uuid),
                 "group_class": "project"
             }, ensure_unique_name=True).execute()
-            arv.users().merge(old_user_uuid=r[1],
-                              new_user_uuid=new_user_uuid,
-                              new_owner_uuid=grp["uuid"],
-                              redirect_to_new_user=True).execute()
+            new_owner_uuid = grp["uuid"]
+        migratearv.users().merge(old_user_uuid=old_user_uuid,
+                                    new_user_uuid=new_user_uuid,
+                                    new_owner_uuid=new_owner_uuid,
+                                    redirect_to_new_user=True).execute()
+    except arvados.errors.ApiError as e:
+        name_collision = re.search(r'Key \(owner_uuid, name\)=\((.*?), (.*?)\) already exists\.\n.*UPDATE "(.*?)"', e._get_reason())
+        if name_collision:
+            target_owner, rsc_name, rsc_type = name_collision.groups()
+            print("(%s) Cannot migrate to %s because both origin and target users have a %s named '%s'. Please rename the conflicting items or use --data-into-subproject to migrate all users' data into a special subproject." % (email, target_owner, rsc_type[:-1], rsc_name))
+        else:
+            print("(%s) Skipping user migration because of error: %s" % (email, e))
+
+
+def main():
+    parser = argparse.ArgumentParser(
+        description='Migrate users to federated identity, see https://doc.arvados.org/admin/merge-remote-account.html',
+        parents=[arv_cmd.retry_opt],
+    )
+    parser.add_argument(
+        '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
+        help='Print version and exit.')
+    parser.add_argument('--tokens', type=str, metavar='FILE', required=False, help="Read tokens from FILE. Not needed when using LoginCluster.")
+    parser.add_argument('--data-into-subproject', action="store_true", help="Migrate user's data into a separate subproject. This can be used to avoid name collisions from within an account.")
+    group = parser.add_mutually_exclusive_group(required=True)
+    group.add_argument('--report', type=str, metavar='FILE', help="Generate report .csv file listing users by email address and their associated Arvados accounts.")
+    group.add_argument('--migrate', type=str, metavar='FILE', help="Consume report .csv and migrate users to designated Arvados accounts.")
+    group.add_argument('--dry-run', type=str, metavar='FILE', help="Consume report .csv and report how user would be migrated to designated Arvados accounts.")
+    group.add_argument('--check', action="store_true", help="Check that tokens are usable and the federation is well connected.")
+    args = parser.parse_args()
+
+    clusters, errors, loginCluster = connect_clusters(args)
+
+    if errors:
+        for e in errors:
+            print("ERROR: "+str(e))
+        exit(1)
+
+    if args.check:
+        print("Tokens file passed checks")
+        exit(0)
+
+    rows, by_email, by_username = fetch_users(clusters, loginCluster)
+
+    if args.report:
+        out = csv.writer(open(args.report, "wt"))
+        out.writerow(("email", "username", "user uuid", "home cluster"))
+        for r in rows:
+            out.writerow(r)
+        print("Wrote %s" % args.report)
+        return
+
+    if args.migrate or args.dry_run:
+        if args.dry_run:
+            print("Performing dry run")
+
+        rows = read_migrations(args, by_email, by_username)
+
+        for r in rows:
+            email = r[EMAIL]
+            username = r[USERNAME]
+            old_user_uuid = r[UUID]
+            userhome = r[HOMECLUSTER]
+
+            if userhome == "":
+                print("(%s) Skipping %s, no home cluster specified" % (email, old_user_uuid))
+            if old_user_uuid.startswith(userhome):
+                migratecluster = old_user_uuid[0:5]
+                migratearv = clusters[migratecluster]
+                if migratearv.users().get(uuid=old_user_uuid).execute()["username"] != username:
+                    update_username(args, email, old_user_uuid, username, migratecluster, migratearv)
+                continue
+
+            new_user_uuid = choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters)
+            if new_user_uuid is None:
+                continue
+
+            remote_users = {}
+            got_error = False
+            for migratecluster in clusters:
+                # cluster where the migration is happening
+                migratearv = clusters[migratecluster]
+
+                # the user's new home cluster
+                newhomecluster = userhome[0:5]
+                homearv = clusters[newhomecluster]
+
+                newuser = activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid)
+                if newuser is None:
+                    got_error = True
+                remote_users[migratecluster] = newuser
+
+            if not got_error:
+                for migratecluster in clusters:
+                    migratearv = clusters[migratecluster]
+                    newuser = remote_users[migratecluster]
+                    if newuser is False:
+                        continue
+
+                    print("(%s) Migrating %s to %s on %s" % (email, old_user_uuid, new_user_uuid, migratecluster))
+
+                    migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid)
+
+                    if newuser['username'] != username:
+                        update_username(args, email, new_user_uuid, username, migratecluster, migratearv)
 
 if __name__ == "__main__":
     main()