#
# SPDX-License-Identifier: Apache-2.0
+#
+# Migration tool for merging user accounts belonging to the same user
+# but on separate clusters to use a single user account managed by a
+# specific cluster.
+#
+# If you're working on this, see
+# arvados/sdk/python/tests/fed-migrate/README for information about
+# the testing infrastructure.
+
import arvados
import arvados.util
+import arvados.errors
import csv
import sys
import argparse
import hmac
+import urllib.parse
+import os
+import hashlib
+from arvados._version import __version__
-def main():
-
- parser = argparse.ArgumentParser(description='Migrate users to federated identity, see https://doc.arvados.org/admin/???')
- parser.add_argument('--tokens', type=str, required=True)
- group = parser.add_mutually_exclusive_group(required=True)
- group.add_argument('--report', type=str)
- group.add_argument('--migrate', type=str)
- args = parser.parse_args()
+EMAIL=0
+USERNAME=1
+UUID=2
+HOMECLUSTER=3
+def connect_clusters(args):
clusters = {}
+ errors = []
+ loginCluster = None
+ if args.tokens:
+ print("Reading %s" % args.tokens)
+ with open(args.tokens, "rt") as f:
+ for r in csv.reader(f):
+ if len(r) != 2:
+ continue
+ host = r[0]
+ token = r[1]
+ print("Contacting %s" % (host))
+ arv = arvados.api(host=host, token=token, cache=False)
+ clusters[arv._rootDesc["uuidPrefix"]] = arv
+ else:
+ arv = arvados.api(cache=False)
+ rh = arv._rootDesc["remoteHosts"]
+ tok = arv.api_client_authorizations().current().execute()
+ token = "v2/%s/%s" % (tok["uuid"], tok["api_token"])
- print("Reading %s" % args.tokens)
- with open(args.tokens, "rt") as f:
- for r in csv.reader(f):
- host = r[0]
- token = r[1]
- arv = arvados.api(host=host, token=token)
- clusters[arv._rootDesc["uuidPrefix"]] = arv
+ for k,v in rh.items():
+ arv = arvados.api(host=v, token=token, cache=False, insecure=os.environ.get("ARVADOS_API_HOST_INSECURE"))
+ clusters[k] = arv
+
+ for _, arv in clusters.items():
+ config = arv.configs().get().execute()
+ if config["Login"]["LoginCluster"] != "" and loginCluster is None:
+ loginCluster = config["Login"]["LoginCluster"]
+
+ print("Checking that the federation is well connected")
+ for arv in clusters.values():
+ config = arv.configs().get().execute()
+ if loginCluster and config["Login"]["LoginCluster"] != loginCluster and config["ClusterID"] != loginCluster:
+ errors.append("Inconsistent login cluster configuration, expected '%s' on %s but was '%s'" % (loginCluster, config["ClusterID"], config["Login"]["LoginCluster"]))
+ continue
+
+ if arv._rootDesc["revision"] < "20190926":
+ errors.append("Arvados API server revision on cluster '%s' is too old, must be updated to at least Arvados 1.5 before running migration." % config["ClusterID"])
+ continue
+
+ try:
cur = arv.users().current().execute()
- if not cur["is_admin"]:
- raise Exception("Not admin of %s" % host)
+ except arvados.errors.ApiError as e:
+ errors.append("checking token for %s %s" % (arv._rootDesc["rootUrl"], e))
+ continue
- if args.report:
- users = []
- for c, arv in clusters.items():
- print("Getting user list from %s" % c)
- ul = arvados.util.list_all(arv.users().list)
- for l in ul:
- if l["uuid"].startswith(c):
- users.append(l)
+ if not cur["is_admin"]:
+ errors.append("User %s is not admin on %s" % (cur["uuid"], arv._rootDesc["uuidPrefix"]))
+ continue
- out = csv.writer(open(args.report, "wt"))
+ for r in clusters:
+ if r != arv._rootDesc["uuidPrefix"] and r not in arv._rootDesc["remoteHosts"]:
+ errors.append("%s is missing from remoteHosts of %s" % (r, arv._rootDesc["uuidPrefix"]))
+ for r in arv._rootDesc["remoteHosts"]:
+ if r != "*" and r not in clusters:
+ print("WARNING: %s is federated with %s but %s is missing from the tokens file or the token is invalid" % (arv._rootDesc["uuidPrefix"], r, r))
- out.writerow(("email", "user uuid", "primary cluster/user"))
+ return clusters, errors, loginCluster
- users = sorted(users, key=lambda u: u["email"]+"::"+u["uuid"])
- accum = []
- lastemail = None
- for u in users:
- if u["uuid"].endswith("-anonymouspublic") or u["uuid"].endswith("-000000000000000"):
- continue
- if lastemail == None:
- lastemail = u["email"]
- if u["email"] == lastemail:
- accum.append(u)
- else:
- homeuuid = None
- for a in accum:
- if homeuuid is None:
- homeuuid = a["uuid"]
- if a["uuid"] != homeuuid:
- homeuuid = ""
- for a in accum:
- out.writerow((a["email"], a["uuid"], homeuuid[0:5]))
- lastemail = u["email"]
- accum = [u]
+def fetch_users(clusters, loginCluster):
+ rows = []
+ by_email = {}
+ by_username = {}
+
+ users = []
+ for c, arv in clusters.items():
+ print("Getting user list from %s" % c)
+ ul = arvados.util.list_all(arv.users().list)
+ for l in ul:
+ if l["uuid"].startswith(c):
+ users.append(l)
+
+ # Users list is sorted by email
+ # Go through users and collect users with same email
+ # when we see a different email (or get to the end)
+ # call add_accum_rows() to generate the report rows with
+ # the "home cluster" set, and also fill in the by_email table.
+ users = sorted(users, key=lambda u: u["email"]+"::"+(u["username"] or "")+"::"+u["uuid"])
+
+ accum = []
+ lastemail = None
+
+ def add_accum_rows():
homeuuid = None
for a in accum:
- if homeuuid is None:
- homeuuid = a["uuid"]
- if a["uuid"] != homeuuid:
- homeuuid = ""
+ uuids = set(a["uuid"] for a in accum)
+ homeuuid = ((len(uuids) == 1) and uuids.pop()) or ""
for a in accum:
- out.writerow((a["email"], a["uuid"], homeuuid[0:5]))
+ r = (a["email"], a["username"], a["uuid"], loginCluster or homeuuid[0:5])
+ by_email.setdefault(a["email"], {})
+ by_email[a["email"]][a["uuid"]] = r
+ homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], a["username"])
+ if homeuuid_and_username not in by_username:
+ by_username[homeuuid_and_username] = a["email"]
+ elif by_username[homeuuid_and_username] != a["email"]:
+ print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
+ exit(1)
+ rows.append(r)
- print("Wrote %s" % args.report)
+ for u in users:
+ if u["uuid"].endswith("-anonymouspublic") or u["uuid"].endswith("-000000000000000"):
+ continue
+ if lastemail == None:
+ lastemail = u["email"]
+ if u["email"] == lastemail:
+ accum.append(u)
+ else:
+ add_accum_rows()
+ lastemail = u["email"]
+ accum = [u]
- if args.migrate:
- rows = []
- by_email = {}
- with open(args.migrate, "rt") as f:
- for r in csv.reader(f):
- if r[0] == "email":
- continue
- by_email.setdefault(r[0], [])
- by_email[r[0]].append(r)
- rows.append(r)
- for r in rows:
- if r[2] == "":
- print("(%s) Skipping %s, no home cluster specified" % (r[0], r[1]))
- if r[1].startswith(r[2]):
- continue
- candidates = []
- for b in by_email[r[0]]:
- if b[1].startswith(r[2]):
- candidates.append(b)
- if len(candidates) == 0:
- print("(%s) No user listed to migrate %s to %s" % (r[0], r[1], r[2]))
- continue
- if len(candidates) > 1:
- print("(%s) Multiple users listed to migrate %s to %s, use full uuid" % (r[0], r[1], r[2]))
+ add_accum_rows()
+
+ return rows, by_email, by_username
+
+
+def read_migrations(args, by_email, by_username):
+ rows = []
+ with open(args.migrate or args.dry_run, "rt") as f:
+ for r in csv.reader(f):
+ if r[EMAIL] == "email":
continue
- new_user_uuid = candidates[0][1]
- print("(%s) Migrating %s to %s" % (r[0], r[1], new_user_uuid))
- oldcluster = r[1][0:5]
- newhomecluster = r[2][0:5]
- homearv = clusters[newhomecluster]
- # create a token
- newtok = homearv.api_client_authorizations().create(body={"api_client_authorization": {'owner_uuid': new_user_uuid}}).execute()
- salted = 'v2/' + newtok["uuid"] + '/' + hmac.new(newtok["api_token"].encode(), msg=oldcluster.encode(), digestmod='sha1').hexdigest()
- arvados.api(host=arv._rootDesc["rootUrl"][8:-1], token=salted).users().current().execute()
-
- # now migrate from local user to remote user.
- arv = clusters[oldcluster]
-
- grp = arv.groups().create(body={
+ by_email.setdefault(r[EMAIL], {})
+ by_email[r[EMAIL]][r[UUID]] = r
+
+ homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], r[USERNAME])
+ if homeuuid_and_username not in by_username:
+ by_username[homeuuid_and_username] = r[EMAIL]
+ elif by_username[homeuuid_and_username] != r[EMAIL]:
+ print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
+ exit(1)
+
+ rows.append(r)
+ return rows
+
+def update_username(args, email, user_uuid, username, migratecluster, migratearv):
+ print("(%s) Updating username of %s to '%s' on %s" % (email, user_uuid, username, migratecluster))
+ if not args.dry_run:
+ try:
+ conflicts = migratearv.users().list(filters=[["username", "=", username]]).execute()
+ if conflicts["items"]:
+ migratearv.users().update(uuid=conflicts["items"][0]["uuid"], body={"user": {"username": username+"migrate"}}).execute()
+ migratearv.users().update(uuid=user_uuid, body={"user": {"username": username}}).execute()
+ except arvados.errors.ApiError as e:
+ print("(%s) Error updating username of %s to '%s' on %s: %s" % (email, user_uuid, username, migratecluster, e))
+
+
+def choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters):
+ candidates = []
+ conflict = False
+ for b in by_email[email].values():
+ if b[2].startswith(userhome):
+ candidates.append(b)
+ if b[1] != username and b[3] == userhome:
+ print("(%s) Cannot migrate %s, conflicting usernames %s and %s" % (email, old_user_uuid, b[1], username))
+ conflict = True
+ break
+ if conflict:
+ return None
+ if len(candidates) == 0:
+ if len(userhome) == 5 and userhome not in clusters:
+ print("(%s) Cannot migrate %s, unknown home cluster %s (typo?)" % (email, old_user_uuid, userhome))
+ return None
+ print("(%s) No user listed with same email to migrate %s to %s, will create new user with username '%s'" % (email, old_user_uuid, userhome, username))
+ if not args.dry_run:
+ oldhomecluster = old_user_uuid[0:5]
+ oldhomearv = clusters[oldhomecluster]
+ newhomecluster = userhome[0:5]
+ homearv = clusters[userhome]
+ user = None
+ try:
+ olduser = oldhomearv.users().get(uuid=old_user_uuid).execute()
+ conflicts = homearv.users().list(filters=[["username", "=", username]]).execute()
+ if conflicts["items"]:
+ homearv.users().update(uuid=conflicts["items"][0]["uuid"], body={"user": {"username": username+"migrate"}}).execute()
+ user = homearv.users().create(body={"user": {"email": email, "username": username, "is_active": olduser["is_active"]}}).execute()
+ except arvados.errors.ApiError as e:
+ print("(%s) Could not create user: %s" % (email, str(e)))
+ return None
+
+ tup = (email, username, user["uuid"], userhome)
+ else:
+ # dry run
+ tup = (email, username, "%s-tpzed-xfakexfakexfake" % (userhome[0:5]), userhome)
+ by_email[email][tup[2]] = tup
+ candidates.append(tup)
+ if len(candidates) > 1:
+ print("(%s) Multiple users listed to migrate %s to %s, use full uuid" % (email, old_user_uuid, userhome))
+ return None
+ return candidates[0][2]
+
+
+def activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid):
+ # create a token for the new user and salt it for the
+ # migration cluster, then use it to access the migration
+ # cluster as the new user once before merging to ensure
+ # the new user is known on that cluster.
+ migratecluster = migratearv._rootDesc["uuidPrefix"]
+ try:
+ if not args.dry_run:
+ newtok = homearv.api_client_authorizations().create(body={
+ "api_client_authorization": {'owner_uuid': new_user_uuid}}).execute()
+ else:
+ newtok = {"uuid": "dry-run", "api_token": "12345"}
+ except arvados.errors.ApiError as e:
+ print("(%s) Could not create API token for %s: %s" % (email, new_user_uuid, e))
+ return None
+
+ try:
+ olduser = migratearv.users().get(uuid=old_user_uuid).execute()
+ except arvados.errors.ApiError as e:
+ if e.resp.status != 404:
+ print("(%s) Could not retrieve user %s from %s, user may have already been migrated: %s" % (email, old_user_uuid, migratecluster, e))
+ return None
+
+ salted = 'v2/' + newtok["uuid"] + '/' + hmac.new(newtok["api_token"].encode(),
+ msg=migratecluster.encode(),
+ digestmod=hashlib.sha1).hexdigest()
+ try:
+ ru = urllib.parse.urlparse(migratearv._rootDesc["rootUrl"])
+ if not args.dry_run:
+ newuser = arvados.api(host=ru.netloc, token=salted, insecure=os.environ.get("ARVADOS_API_HOST_INSECURE")).users().current().execute()
+ else:
+ newuser = {"is_active": True, "username": username}
+ except arvados.errors.ApiError as e:
+ print("(%s) Error getting user info for %s from %s: %s" % (email, new_user_uuid, migratecluster, e))
+ return None
+
+ if not newuser["is_active"] and olduser["is_active"]:
+ print("(%s) Activating user %s on %s" % (email, new_user_uuid, migratecluster))
+ try:
+ if not args.dry_run:
+ migratearv.users().update(uuid=new_user_uuid, body={"is_active": True}).execute()
+ except arvados.errors.ApiError as e:
+ print("(%s) Could not activate user %s on %s: %s" % (email, new_user_uuid, migratecluster, e))
+ return None
+
+ if olduser["is_admin"] and not newuser["is_admin"]:
+ print("(%s) Not migrating %s because user is admin but target user %s is not admin on %s" % (email, old_user_uuid, new_user_uuid, migratecluster))
+ return None
+
+ return newuser
+
+def migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid):
+ try:
+ if not args.dry_run:
+ grp = migratearv.groups().create(body={
"owner_uuid": new_user_uuid,
- "name": "Migrated from %s (%s)" % (r[0], r[1]),
+ "name": "Migrated from %s (%s)" % (email, old_user_uuid),
"group_class": "project"
}, ensure_unique_name=True).execute()
- arv.users().merge(old_user_uuid=r[1],
- new_user_uuid=new_user_uuid,
- new_owner_uuid=grp["uuid"],
- redirect_to_new_user=True).execute()
+ migratearv.users().merge(old_user_uuid=old_user_uuid,
+ new_user_uuid=new_user_uuid,
+ new_owner_uuid=grp["uuid"],
+ redirect_to_new_user=True).execute()
+ except arvados.errors.ApiError as e:
+ print("(%s) Error migrating user: %s" % (email, e))
+
+
+def main():
+
+ parser = argparse.ArgumentParser(description='Migrate users to federated identity, see https://doc.arvados.org/admin/merge-remote-account.html')
+ parser.add_argument(
+ '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
+ help='Print version and exit.')
+ parser.add_argument('--tokens', type=str, required=False)
+ group = parser.add_mutually_exclusive_group(required=True)
+ group.add_argument('--report', type=str, help="Generate report .csv file listing users by email address and their associated Arvados accounts")
+ group.add_argument('--migrate', type=str, help="Consume report .csv and migrate users to designated Arvados accounts")
+ group.add_argument('--dry-run', type=str, help="Consume report .csv and report how user would be migrated to designated Arvados accounts")
+ group.add_argument('--check', action="store_true", help="Check that tokens are usable and the federation is well connected")
+ args = parser.parse_args()
+
+ clusters, errors, loginCluster = connect_clusters(args)
+
+ if errors:
+ for e in errors:
+ print("ERROR: "+str(e))
+ exit(1)
+
+ if args.check:
+ print("Tokens file passed checks")
+ exit(0)
+
+ rows, by_email, by_username = fetch_users(clusters, loginCluster)
+
+ if args.report:
+ out = csv.writer(open(args.report, "wt"))
+ out.writerow(("email", "username", "user uuid", "home cluster"))
+ for r in rows:
+ out.writerow(r)
+ print("Wrote %s" % args.report)
+ return
+
+ if args.migrate or args.dry_run:
+ if args.dry_run:
+ print("Performing dry run")
+
+ rows = read_migrations(args, by_email, by_username)
+
+ for r in rows:
+ email = r[EMAIL]
+ username = r[USERNAME]
+ old_user_uuid = r[UUID]
+ userhome = r[HOMECLUSTER]
+
+ if userhome == "":
+ print("(%s) Skipping %s, no home cluster specified" % (email, old_user_uuid))
+ if old_user_uuid.startswith(userhome):
+ migratecluster = old_user_uuid[0:5]
+ migratearv = clusters[migratecluster]
+ if migratearv.users().get(uuid=old_user_uuid).execute()["username"] != username:
+ update_username(args, email, old_user_uuid, username, migratecluster, migratearv)
+ continue
+
+ new_user_uuid = choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters)
+ if new_user_uuid is None:
+ continue
+
+ # cluster where the migration is happening
+ for migratecluster in clusters:
+ migratearv = clusters[migratecluster]
+
+ # the user's new home cluster
+ newhomecluster = userhome[0:5]
+ homearv = clusters[newhomecluster]
+
+ newuser = activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid)
+ if newuser is None:
+ continue
+
+ print("(%s) Migrating %s to %s on %s" % (email, old_user_uuid, new_user_uuid, migratecluster))
+
+ migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid)
+
+ if newuser['username'] != username:
+ update_username(args, email, new_user_uuid, username, migratecluster, migratearv)
if __name__ == "__main__":
main()