Merge branch '17830-reqid-header-propagation-fix' into main. Closes #17830
[arvados.git] / sdk / python / arvados / commands / federation_migrate.py
1 #!/usr/bin/env python3
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 #
7 # Migration tool for merging user accounts belonging to the same user
8 # but on separate clusters to use a single user account managed by a
9 # specific cluster.
10 #
11 # If you're working on this, see
12 # arvados/sdk/python/tests/fed-migrate/README for information about
13 # the testing infrastructure.
14
15 import arvados
16 import arvados.util
17 import arvados.errors
18 import csv
19 import sys
20 import argparse
21 import hmac
22 import urllib.parse
23 import os
24 import hashlib
25 import re
26 from arvados._version import __version__
27
28 EMAIL=0
29 USERNAME=1
30 UUID=2
31 HOMECLUSTER=3
32
33 def connect_clusters(args):
34     clusters = {}
35     errors = []
36     loginCluster = None
37     if args.tokens:
38         print("Reading %s" % args.tokens)
39         with open(args.tokens, "rt") as f:
40             for r in csv.reader(f):
41                 if len(r) != 2:
42                     continue
43                 host = r[0]
44                 token = r[1]
45                 print("Contacting %s" % (host))
46                 arv = arvados.api(host=host, token=token, cache=False)
47                 clusters[arv._rootDesc["uuidPrefix"]] = arv
48     else:
49         arv = arvados.api(cache=False)
50         rh = arv._rootDesc["remoteHosts"]
51         tok = arv.api_client_authorizations().current().execute()
52         token = "v2/%s/%s" % (tok["uuid"], tok["api_token"])
53
54         for k,v in rh.items():
55             arv = arvados.api(host=v, token=token, cache=False, insecure=os.environ.get("ARVADOS_API_HOST_INSECURE"))
56             clusters[k] = arv
57
58     for _, arv in clusters.items():
59         config = arv.configs().get().execute()
60         if config["Login"]["LoginCluster"] != "" and loginCluster is None:
61             loginCluster = config["Login"]["LoginCluster"]
62
63     print("Checking that the federation is well connected")
64     for arv in clusters.values():
65         config = arv.configs().get().execute()
66         if loginCluster and config["Login"]["LoginCluster"] != loginCluster and config["ClusterID"] != loginCluster:
67             errors.append("Inconsistent login cluster configuration, expected '%s' on %s but was '%s'" % (loginCluster, config["ClusterID"], config["Login"]["LoginCluster"]))
68             continue
69
70         if arv._rootDesc["revision"] < "20200331":
71             errors.append("Arvados API server revision on cluster '%s' is too old, must be updated to at least Arvados 2.0.2 before running migration." % config["ClusterID"])
72             continue
73
74         try:
75             cur = arv.users().current().execute()
76         except arvados.errors.ApiError as e:
77             errors.append("checking token for %s   %s" % (arv._rootDesc["rootUrl"], e))
78             continue
79
80         if not cur["is_admin"]:
81             errors.append("User %s is not admin on %s" % (cur["uuid"], arv._rootDesc["uuidPrefix"]))
82             continue
83
84         for r in clusters:
85             if r != arv._rootDesc["uuidPrefix"] and r not in arv._rootDesc["remoteHosts"]:
86                 errors.append("%s is missing from remoteHosts of %s" % (r, arv._rootDesc["uuidPrefix"]))
87         for r in arv._rootDesc["remoteHosts"]:
88             if r != "*" and r not in clusters:
89                 print("WARNING: %s is federated with %s but %s is missing from the tokens file or the token is invalid" % (arv._rootDesc["uuidPrefix"], r, r))
90
91     return clusters, errors, loginCluster
92
93
94 def fetch_users(clusters, loginCluster):
95     rows = []
96     by_email = {}
97     by_username = {}
98
99     users = []
100     for c, arv in clusters.items():
101         print("Getting user list from %s" % c)
102         ul = arvados.util.list_all(arv.users().list, bypass_federation=True)
103         for l in ul:
104             if l["uuid"].startswith(c):
105                 users.append(l)
106
107     # Users list is sorted by email
108     # Go through users and collect users with same email
109     # when we see a different email (or get to the end)
110     # call add_accum_rows() to generate the report rows with
111     # the "home cluster" set, and also fill in the by_email table.
112
113     users = sorted(users, key=lambda u: u["email"]+"::"+(u["username"] or "")+"::"+u["uuid"])
114
115     accum = []
116     lastemail = None
117
118     def add_accum_rows():
119         homeuuid = None
120         for a in accum:
121             uuids = set(a["uuid"] for a in accum)
122             homeuuid = ((len(uuids) == 1) and uuids.pop()) or ""
123         for a in accum:
124             r = (a["email"], a["username"], a["uuid"], loginCluster or homeuuid[0:5])
125             by_email.setdefault(a["email"], {})
126             by_email[a["email"]][a["uuid"]] = r
127             homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], a["username"])
128             if homeuuid_and_username not in by_username:
129                 by_username[homeuuid_and_username] = a["email"]
130             elif by_username[homeuuid_and_username] != a["email"]:
131                 print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
132                 exit(1)
133             rows.append(r)
134
135     for u in users:
136         if u["uuid"].endswith("-anonymouspublic") or u["uuid"].endswith("-000000000000000"):
137             continue
138         if lastemail == None:
139             lastemail = u["email"]
140         if u["email"] == lastemail:
141             accum.append(u)
142         else:
143             add_accum_rows()
144             lastemail = u["email"]
145             accum = [u]
146
147     add_accum_rows()
148
149     return rows, by_email, by_username
150
151
152 def read_migrations(args, by_email, by_username):
153     rows = []
154     with open(args.migrate or args.dry_run, "rt") as f:
155         for r in csv.reader(f):
156             if r[EMAIL] == "email":
157                 continue
158             by_email.setdefault(r[EMAIL], {})
159             by_email[r[EMAIL]][r[UUID]] = r
160
161             homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], r[USERNAME])
162             if homeuuid_and_username not in by_username:
163                 by_username[homeuuid_and_username] = r[EMAIL]
164             elif by_username[homeuuid_and_username] != r[EMAIL]:
165                 print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
166                 exit(1)
167
168             rows.append(r)
169     return rows
170
171 def update_username(args, email, user_uuid, username, migratecluster, migratearv):
172     print("(%s) Updating username of %s to '%s' on %s" % (email, user_uuid, username, migratecluster))
173     if args.dry_run:
174         return
175     try:
176         conflicts = migratearv.users().list(filters=[["username", "=", username]], bypass_federation=True).execute()
177         if conflicts["items"]:
178             # There's already a user with the username, move the old user out of the way
179             migratearv.users().update(uuid=conflicts["items"][0]["uuid"],
180                                         bypass_federation=True,
181                                         body={"user": {"username": username+"migrate"}}).execute()
182         migratearv.users().update(uuid=user_uuid,
183                                     bypass_federation=True,
184                                     body={"user": {"username": username}}).execute()
185     except arvados.errors.ApiError as e:
186         print("(%s) Error updating username of %s to '%s' on %s: %s" % (email, user_uuid, username, migratecluster, e))
187
188
189 def choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters):
190     candidates = []
191     conflict = False
192     for b in by_email[email].values():
193         if b[2].startswith(userhome):
194             candidates.append(b)
195         if b[1] != username and b[3] == userhome:
196             print("(%s) Cannot migrate %s, conflicting usernames %s and %s" % (email, old_user_uuid, b[1], username))
197             conflict = True
198             break
199     if conflict:
200         return None
201     if len(candidates) == 0:
202         if len(userhome) == 5 and userhome not in clusters:
203             print("(%s) Cannot migrate %s, unknown home cluster %s (typo?)" % (email, old_user_uuid, userhome))
204             return None
205         print("(%s) No user listed with same email to migrate %s to %s, will create new user with username '%s'" % (email, old_user_uuid, userhome, username))
206         if not args.dry_run:
207             oldhomecluster = old_user_uuid[0:5]
208             oldhomearv = clusters[oldhomecluster]
209             newhomecluster = userhome[0:5]
210             homearv = clusters[userhome]
211             user = None
212             try:
213                 olduser = oldhomearv.users().get(uuid=old_user_uuid).execute()
214                 conflicts = homearv.users().list(filters=[["username", "=", username]],
215                                                  bypass_federation=True).execute()
216                 if conflicts["items"]:
217                     homearv.users().update(
218                         uuid=conflicts["items"][0]["uuid"],
219                         bypass_federation=True,
220                         body={"user": {"username": username+"migrate"}}).execute()
221                 user = homearv.users().create(
222                     body={"user": {
223                         "email": email,
224                         "first_name": olduser["first_name"],
225                         "last_name": olduser["last_name"],
226                         "username": username,
227                         "is_active": olduser["is_active"]}}).execute()
228             except arvados.errors.ApiError as e:
229                 print("(%s) Could not create user: %s" % (email, str(e)))
230                 return None
231
232             tup = (email, username, user["uuid"], userhome)
233         else:
234             # dry run
235             tup = (email, username, "%s-tpzed-xfakexfakexfake" % (userhome[0:5]), userhome)
236         by_email[email][tup[2]] = tup
237         candidates.append(tup)
238     if len(candidates) > 1:
239         print("(%s) Multiple users listed to migrate %s to %s, use full uuid" % (email, old_user_uuid, userhome))
240         return None
241     return candidates[0][2]
242
243
244 def activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid):
245     # create a token for the new user and salt it for the
246     # migration cluster, then use it to access the migration
247     # cluster as the new user once before merging to ensure
248     # the new user is known on that cluster.
249     migratecluster = migratearv._rootDesc["uuidPrefix"]
250     try:
251         if not args.dry_run:
252             newtok = homearv.api_client_authorizations().create(body={
253                 "api_client_authorization": {'owner_uuid': new_user_uuid}}).execute()
254         else:
255             newtok = {"uuid": "dry-run", "api_token": "12345"}
256     except arvados.errors.ApiError as e:
257         print("(%s) Could not create API token for %s: %s" % (email, new_user_uuid, e))
258         return None
259
260     try:
261         findolduser = migratearv.users().list(filters=[["uuid", "=", old_user_uuid]], bypass_federation=True).execute()
262         if len(findolduser["items"]) == 0:
263             return False
264         if len(findolduser["items"]) == 1:
265             olduser = findolduser["items"][0]
266         else:
267             print("(%s) Unexpected result" % (email))
268             return None
269     except arvados.errors.ApiError as e:
270         print("(%s) Could not retrieve user %s from %s, user may have already been migrated: %s" % (email, old_user_uuid, migratecluster, e))
271         return None
272
273     salted = 'v2/' + newtok["uuid"] + '/' + hmac.new(newtok["api_token"].encode(),
274                                                      msg=migratecluster.encode(),
275                                                      digestmod=hashlib.sha1).hexdigest()
276     try:
277         ru = urllib.parse.urlparse(migratearv._rootDesc["rootUrl"])
278         if not args.dry_run:
279             newuser = arvados.api(host=ru.netloc, token=salted,
280                                   insecure=os.environ.get("ARVADOS_API_HOST_INSECURE")).users().current().execute()
281         else:
282             newuser = {"is_active": True, "username": email.split('@')[0], "is_admin": False}
283     except arvados.errors.ApiError as e:
284         print("(%s) Error getting user info for %s from %s: %s" % (email, new_user_uuid, migratecluster, e))
285         return None
286
287     if not newuser["is_active"] and olduser["is_active"]:
288         print("(%s) Activating user %s on %s" % (email, new_user_uuid, migratecluster))
289         try:
290             if not args.dry_run:
291                 migratearv.users().update(uuid=new_user_uuid, bypass_federation=True,
292                                           body={"is_active": True}).execute()
293         except arvados.errors.ApiError as e:
294             print("(%s) Could not activate user %s on %s: %s" % (email, new_user_uuid, migratecluster, e))
295             return None
296
297     if olduser["is_admin"] and not newuser["is_admin"]:
298         print("(%s) Not migrating %s because user is admin but target user %s is not admin on %s. Please ensure the user admin status is the same on both clusters. Note that a federated admin account has admin privileges on the entire federation." % (email, old_user_uuid, new_user_uuid, migratecluster))
299         return None
300
301     return newuser
302
303 def migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid):
304     if args.dry_run:
305         return
306     try:
307         new_owner_uuid = new_user_uuid
308         if args.data_into_subproject:
309             grp = migratearv.groups().create(body={
310                 "owner_uuid": new_user_uuid,
311                 "name": "Migrated from %s (%s)" % (email, old_user_uuid),
312                 "group_class": "project"
313             }, ensure_unique_name=True).execute()
314             new_owner_uuid = grp["uuid"]
315         migratearv.users().merge(old_user_uuid=old_user_uuid,
316                                     new_user_uuid=new_user_uuid,
317                                     new_owner_uuid=new_owner_uuid,
318                                     redirect_to_new_user=True).execute()
319     except arvados.errors.ApiError as e:
320         name_collision = re.search(r'Key \(owner_uuid, name\)=\((.*?), (.*?)\) already exists\.\n.*UPDATE "(.*?)"', e._get_reason())
321         if name_collision:
322             target_owner, rsc_name, rsc_type = name_collision.groups()
323             print("(%s) Cannot migrate to %s because both origin and target users have a %s named '%s'. Please rename the conflicting items or use --data-into-subproject to migrate all users' data into a special subproject." % (email, target_owner, rsc_type[:-1], rsc_name))
324         else:
325             print("(%s) Skipping user migration because of error: %s" % (email, e))
326
327
328 def main():
329     parser = argparse.ArgumentParser(description='Migrate users to federated identity, see https://doc.arvados.org/admin/merge-remote-account.html')
330     parser.add_argument(
331         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
332         help='Print version and exit.')
333     parser.add_argument('--tokens', type=str, metavar='FILE', required=False, help="Read tokens from FILE. Not needed when using LoginCluster.")
334     parser.add_argument('--data-into-subproject', action="store_true", help="Migrate user's data into a separate subproject. This can be used to avoid name collisions from within an account.")
335     group = parser.add_mutually_exclusive_group(required=True)
336     group.add_argument('--report', type=str, metavar='FILE', help="Generate report .csv file listing users by email address and their associated Arvados accounts.")
337     group.add_argument('--migrate', type=str, metavar='FILE', help="Consume report .csv and migrate users to designated Arvados accounts.")
338     group.add_argument('--dry-run', type=str, metavar='FILE', help="Consume report .csv and report how user would be migrated to designated Arvados accounts.")
339     group.add_argument('--check', action="store_true", help="Check that tokens are usable and the federation is well connected.")
340     args = parser.parse_args()
341
342     clusters, errors, loginCluster = connect_clusters(args)
343
344     if errors:
345         for e in errors:
346             print("ERROR: "+str(e))
347         exit(1)
348
349     if args.check:
350         print("Tokens file passed checks")
351         exit(0)
352
353     rows, by_email, by_username = fetch_users(clusters, loginCluster)
354
355     if args.report:
356         out = csv.writer(open(args.report, "wt"))
357         out.writerow(("email", "username", "user uuid", "home cluster"))
358         for r in rows:
359             out.writerow(r)
360         print("Wrote %s" % args.report)
361         return
362
363     if args.migrate or args.dry_run:
364         if args.dry_run:
365             print("Performing dry run")
366
367         rows = read_migrations(args, by_email, by_username)
368
369         for r in rows:
370             email = r[EMAIL]
371             username = r[USERNAME]
372             old_user_uuid = r[UUID]
373             userhome = r[HOMECLUSTER]
374
375             if userhome == "":
376                 print("(%s) Skipping %s, no home cluster specified" % (email, old_user_uuid))
377             if old_user_uuid.startswith(userhome):
378                 migratecluster = old_user_uuid[0:5]
379                 migratearv = clusters[migratecluster]
380                 if migratearv.users().get(uuid=old_user_uuid).execute()["username"] != username:
381                     update_username(args, email, old_user_uuid, username, migratecluster, migratearv)
382                 continue
383
384             new_user_uuid = choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters)
385             if new_user_uuid is None:
386                 continue
387
388             remote_users = {}
389             got_error = False
390             for migratecluster in clusters:
391                 # cluster where the migration is happening
392                 migratearv = clusters[migratecluster]
393
394                 # the user's new home cluster
395                 newhomecluster = userhome[0:5]
396                 homearv = clusters[newhomecluster]
397
398                 newuser = activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid)
399                 if newuser is None:
400                     got_error = True
401                 remote_users[migratecluster] = newuser
402
403             if not got_error:
404                 for migratecluster in clusters:
405                     migratearv = clusters[migratecluster]
406                     newuser = remote_users[migratecluster]
407                     if newuser is False:
408                         continue
409
410                     print("(%s) Migrating %s to %s on %s" % (email, old_user_uuid, new_user_uuid, migratecluster))
411
412                     migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid)
413
414                     if newuser['username'] != username:
415                         update_username(args, email, new_user_uuid, username, migratecluster, migratearv)
416
417 if __name__ == "__main__":
418     main()