21720: added type assertions for AxiosInstance get
[arvados.git] / sdk / python / arvados / commands / federation_migrate.py
1 #!/usr/bin/env python3
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 #
7 # Migration tool for merging user accounts belonging to the same user
8 # but on separate clusters to use a single user account managed by a
9 # specific cluster.
10 #
11 # If you're working on this, see
12 # arvados/sdk/python/tests/fed-migrate/README for information about
13 # the testing infrastructure.
14
15 import arvados
16 import arvados.util
17 import arvados.errors
18 import csv
19 import sys
20 import argparse
21 import hmac
22 import urllib.parse
23 import os
24 import hashlib
25 import re
26 from arvados._version import __version__
27 from . import _util as arv_cmd
28
29 EMAIL=0
30 USERNAME=1
31 UUID=2
32 HOMECLUSTER=3
33
34 def connect_clusters(args):
35     clusters = {}
36     errors = []
37     loginCluster = None
38     if args.tokens:
39         print("Reading %s" % args.tokens)
40         with open(args.tokens, "rt") as f:
41             for r in csv.reader(f):
42                 if len(r) != 2:
43                     continue
44                 host = r[0]
45                 token = r[1]
46                 print("Contacting %s" % (host))
47                 arv = arvados.api(host=host, token=token, cache=False, num_retries=args.retries)
48                 clusters[arv._rootDesc["uuidPrefix"]] = arv
49     else:
50         arv = arvados.api(cache=False, num_retries=args.retries)
51         rh = arv._rootDesc["remoteHosts"]
52         tok = arv.api_client_authorizations().current().execute()
53         token = "v2/%s/%s" % (tok["uuid"], tok["api_token"])
54
55         for k,v in rh.items():
56             arv = arvados.api(host=v, token=token, cache=False, insecure=os.environ.get("ARVADOS_API_HOST_INSECURE"))
57             clusters[k] = arv
58
59     for _, arv in clusters.items():
60         config = arv.configs().get().execute()
61         if config["Login"]["LoginCluster"] != "" and loginCluster is None:
62             loginCluster = config["Login"]["LoginCluster"]
63
64     print("Checking that the federation is well connected")
65     for arv in clusters.values():
66         config = arv.configs().get().execute()
67         if loginCluster and config["Login"]["LoginCluster"] != loginCluster and config["ClusterID"] != loginCluster:
68             errors.append("Inconsistent login cluster configuration, expected '%s' on %s but was '%s'" % (loginCluster, config["ClusterID"], config["Login"]["LoginCluster"]))
69             continue
70
71         if arv._rootDesc["revision"] < "20200331":
72             errors.append("Arvados API server revision on cluster '%s' is too old, must be updated to at least Arvados 2.0.2 before running migration." % config["ClusterID"])
73             continue
74
75         try:
76             cur = arv.users().current().execute()
77         except arvados.errors.ApiError as e:
78             errors.append("checking token for %s   %s" % (arv._rootDesc["rootUrl"], e))
79             continue
80
81         if not cur["is_admin"]:
82             errors.append("User %s is not admin on %s" % (cur["uuid"], arv._rootDesc["uuidPrefix"]))
83             continue
84
85         for r in clusters:
86             if r != arv._rootDesc["uuidPrefix"] and r not in arv._rootDesc["remoteHosts"]:
87                 errors.append("%s is missing from remoteHosts of %s" % (r, arv._rootDesc["uuidPrefix"]))
88         for r in arv._rootDesc["remoteHosts"]:
89             if r != "*" and r not in clusters:
90                 print("WARNING: %s is federated with %s but %s is missing from the tokens file or the token is invalid" % (arv._rootDesc["uuidPrefix"], r, r))
91
92     return clusters, errors, loginCluster
93
94
95 def fetch_users(clusters, loginCluster):
96     rows = []
97     by_email = {}
98     by_username = {}
99
100     users = [
101         user
102         for prefix, arv in clusters.items()
103         for user in arvados.util.keyset_list_all(arv.users().list, bypass_federation=True)
104         if user['uuid'].startswith(prefix)
105     ]
106
107     # Users list is sorted by email
108     # Go through users and collect users with same email
109     # when we see a different email (or get to the end)
110     # call add_accum_rows() to generate the report rows with
111     # the "home cluster" set, and also fill in the by_email table.
112
113     users.sort(key=lambda u: (u["email"], u["username"] or "", u["uuid"]))
114
115     accum = []
116     lastemail = None
117
118     def add_accum_rows():
119         homeuuid = None
120         for a in accum:
121             uuids = set(a["uuid"] for a in accum)
122             homeuuid = ((len(uuids) == 1) and uuids.pop()) or ""
123         for a in accum:
124             r = (a["email"], a["username"], a["uuid"], loginCluster or homeuuid[0:5])
125             by_email.setdefault(a["email"], {})
126             by_email[a["email"]][a["uuid"]] = r
127             homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], a["username"])
128             if homeuuid_and_username not in by_username:
129                 by_username[homeuuid_and_username] = a["email"]
130             elif by_username[homeuuid_and_username] != a["email"]:
131                 print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
132                 exit(1)
133             rows.append(r)
134
135     for u in users:
136         if u["uuid"].endswith("-anonymouspublic") or u["uuid"].endswith("-000000000000000"):
137             continue
138         if lastemail == None:
139             lastemail = u["email"]
140         if u["email"] == lastemail:
141             accum.append(u)
142         else:
143             add_accum_rows()
144             lastemail = u["email"]
145             accum = [u]
146
147     add_accum_rows()
148
149     return rows, by_email, by_username
150
151
152 def read_migrations(args, by_email, by_username):
153     rows = []
154     with open(args.migrate or args.dry_run, "rt") as f:
155         for r in csv.reader(f):
156             if r[EMAIL] == "email":
157                 continue
158             by_email.setdefault(r[EMAIL], {})
159             by_email[r[EMAIL]][r[UUID]] = r
160
161             homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], r[USERNAME])
162             if homeuuid_and_username not in by_username:
163                 by_username[homeuuid_and_username] = r[EMAIL]
164             elif by_username[homeuuid_and_username] != r[EMAIL]:
165                 print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
166                 exit(1)
167
168             rows.append(r)
169     return rows
170
171 def update_username(args, email, user_uuid, username, migratecluster, migratearv):
172     print("(%s) Updating username of %s to '%s' on %s" % (email, user_uuid, username, migratecluster))
173     if args.dry_run:
174         return
175     try:
176         conflicts = migratearv.users().list(filters=[["username", "=", username]], bypass_federation=True).execute()
177         if conflicts["items"]:
178             # There's already a user with the username, move the old user out of the way
179             migratearv.users().update(uuid=conflicts["items"][0]["uuid"],
180                                         bypass_federation=True,
181                                         body={"user": {"username": username+"migrate"}}).execute()
182         migratearv.users().update(uuid=user_uuid,
183                                     bypass_federation=True,
184                                     body={"user": {"username": username}}).execute()
185     except arvados.errors.ApiError as e:
186         print("(%s) Error updating username of %s to '%s' on %s: %s" % (email, user_uuid, username, migratecluster, e))
187
188
189 def choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters):
190     candidates = []
191     conflict = False
192     for b in by_email[email].values():
193         if b[2].startswith(userhome):
194             candidates.append(b)
195         if b[1] != username and b[3] == userhome:
196             print("(%s) Cannot migrate %s, conflicting usernames %s and %s" % (email, old_user_uuid, b[1], username))
197             conflict = True
198             break
199     if conflict:
200         return None
201     if len(candidates) == 0:
202         if len(userhome) == 5 and userhome not in clusters:
203             print("(%s) Cannot migrate %s, unknown home cluster %s (typo?)" % (email, old_user_uuid, userhome))
204             return None
205         print("(%s) No user listed with same email to migrate %s to %s, will create new user with username '%s'" % (email, old_user_uuid, userhome, username))
206         if not args.dry_run:
207             oldhomecluster = old_user_uuid[0:5]
208             oldhomearv = clusters[oldhomecluster]
209             newhomecluster = userhome[0:5]
210             homearv = clusters[userhome]
211             user = None
212             try:
213                 olduser = oldhomearv.users().get(uuid=old_user_uuid).execute()
214                 conflicts = homearv.users().list(filters=[["username", "=", username]],
215                                                  bypass_federation=True).execute()
216                 if conflicts["items"]:
217                     homearv.users().update(
218                         uuid=conflicts["items"][0]["uuid"],
219                         bypass_federation=True,
220                         body={"user": {"username": username+"migrate"}}).execute()
221                 user = homearv.users().create(
222                     body={"user": {
223                         "email": email,
224                         "first_name": olduser["first_name"],
225                         "last_name": olduser["last_name"],
226                         "username": username,
227                         "is_active": olduser["is_active"]}}).execute()
228             except arvados.errors.ApiError as e:
229                 print("(%s) Could not create user: %s" % (email, str(e)))
230                 return None
231
232             tup = (email, username, user["uuid"], userhome)
233         else:
234             # dry run
235             tup = (email, username, "%s-tpzed-xfakexfakexfake" % (userhome[0:5]), userhome)
236         by_email[email][tup[2]] = tup
237         candidates.append(tup)
238     if len(candidates) > 1:
239         print("(%s) Multiple users listed to migrate %s to %s, use full uuid" % (email, old_user_uuid, userhome))
240         return None
241     return candidates[0][2]
242
243
244 def activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid):
245     # create a token for the new user and salt it for the
246     # migration cluster, then use it to access the migration
247     # cluster as the new user once before merging to ensure
248     # the new user is known on that cluster.
249     migratecluster = migratearv._rootDesc["uuidPrefix"]
250     try:
251         if not args.dry_run:
252             newtok = homearv.api_client_authorizations().create(body={
253                 "api_client_authorization": {'owner_uuid': new_user_uuid}}).execute()
254         else:
255             newtok = {"uuid": "dry-run", "api_token": "12345"}
256     except arvados.errors.ApiError as e:
257         print("(%s) Could not create API token for %s: %s" % (email, new_user_uuid, e))
258         return None
259
260     try:
261         findolduser = migratearv.users().list(filters=[["uuid", "=", old_user_uuid]], bypass_federation=True).execute()
262         if len(findolduser["items"]) == 0:
263             return False
264         if len(findolduser["items"]) == 1:
265             olduser = findolduser["items"][0]
266         else:
267             print("(%s) Unexpected result" % (email))
268             return None
269     except arvados.errors.ApiError as e:
270         print("(%s) Could not retrieve user %s from %s, user may have already been migrated: %s" % (email, old_user_uuid, migratecluster, e))
271         return None
272
273     salted = 'v2/' + newtok["uuid"] + '/' + hmac.new(newtok["api_token"].encode(),
274                                                      msg=migratecluster.encode(),
275                                                      digestmod=hashlib.sha1).hexdigest()
276     try:
277         ru = urllib.parse.urlparse(migratearv._rootDesc["rootUrl"])
278         if not args.dry_run:
279             newuser = arvados.api(host=ru.netloc, token=salted,
280                                   insecure=os.environ.get("ARVADOS_API_HOST_INSECURE")).users().current().execute()
281         else:
282             newuser = {"is_active": True, "username": email.split('@')[0], "is_admin": False}
283     except arvados.errors.ApiError as e:
284         print("(%s) Error getting user info for %s from %s: %s" % (email, new_user_uuid, migratecluster, e))
285         return None
286
287     if not newuser["is_active"] and olduser["is_active"]:
288         print("(%s) Activating user %s on %s" % (email, new_user_uuid, migratecluster))
289         try:
290             if not args.dry_run:
291                 migratearv.users().update(uuid=new_user_uuid, bypass_federation=True,
292                                           body={"is_active": True}).execute()
293         except arvados.errors.ApiError as e:
294             print("(%s) Could not activate user %s on %s: %s" % (email, new_user_uuid, migratecluster, e))
295             return None
296
297     if olduser["is_admin"] and not newuser["is_admin"]:
298         print("(%s) Not migrating %s because user is admin but target user %s is not admin on %s. Please ensure the user admin status is the same on both clusters. Note that a federated admin account has admin privileges on the entire federation." % (email, old_user_uuid, new_user_uuid, migratecluster))
299         return None
300
301     return newuser
302
303 def migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid):
304     if args.dry_run:
305         return
306     try:
307         new_owner_uuid = new_user_uuid
308         if args.data_into_subproject:
309             grp = migratearv.groups().create(body={
310                 "owner_uuid": new_user_uuid,
311                 "name": "Migrated from %s (%s)" % (email, old_user_uuid),
312                 "group_class": "project"
313             }, ensure_unique_name=True).execute()
314             new_owner_uuid = grp["uuid"]
315         migratearv.users().merge(old_user_uuid=old_user_uuid,
316                                     new_user_uuid=new_user_uuid,
317                                     new_owner_uuid=new_owner_uuid,
318                                     redirect_to_new_user=True).execute()
319     except arvados.errors.ApiError as e:
320         name_collision = re.search(r'Key \(owner_uuid, name\)=\((.*?), (.*?)\) already exists\.\n.*UPDATE "(.*?)"', e._get_reason())
321         if name_collision:
322             target_owner, rsc_name, rsc_type = name_collision.groups()
323             print("(%s) Cannot migrate to %s because both origin and target users have a %s named '%s'. Please rename the conflicting items or use --data-into-subproject to migrate all users' data into a special subproject." % (email, target_owner, rsc_type[:-1], rsc_name))
324         else:
325             print("(%s) Skipping user migration because of error: %s" % (email, e))
326
327
328 def main():
329     parser = argparse.ArgumentParser(
330         description='Migrate users to federated identity, see https://doc.arvados.org/admin/merge-remote-account.html',
331         parents=[arv_cmd.retry_opt],
332     )
333     parser.add_argument(
334         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
335         help='Print version and exit.')
336     parser.add_argument('--tokens', type=str, metavar='FILE', required=False, help="Read tokens from FILE. Not needed when using LoginCluster.")
337     parser.add_argument('--data-into-subproject', action="store_true", help="Migrate user's data into a separate subproject. This can be used to avoid name collisions from within an account.")
338     group = parser.add_mutually_exclusive_group(required=True)
339     group.add_argument('--report', type=str, metavar='FILE', help="Generate report .csv file listing users by email address and their associated Arvados accounts.")
340     group.add_argument('--migrate', type=str, metavar='FILE', help="Consume report .csv and migrate users to designated Arvados accounts.")
341     group.add_argument('--dry-run', type=str, metavar='FILE', help="Consume report .csv and report how user would be migrated to designated Arvados accounts.")
342     group.add_argument('--check', action="store_true", help="Check that tokens are usable and the federation is well connected.")
343     args = parser.parse_args()
344
345     clusters, errors, loginCluster = connect_clusters(args)
346
347     if errors:
348         for e in errors:
349             print("ERROR: "+str(e))
350         exit(1)
351
352     if args.check:
353         print("Tokens file passed checks")
354         exit(0)
355
356     rows, by_email, by_username = fetch_users(clusters, loginCluster)
357
358     if args.report:
359         out = csv.writer(open(args.report, "wt"))
360         out.writerow(("email", "username", "user uuid", "home cluster"))
361         for r in rows:
362             out.writerow(r)
363         print("Wrote %s" % args.report)
364         return
365
366     if args.migrate or args.dry_run:
367         if args.dry_run:
368             print("Performing dry run")
369
370         rows = read_migrations(args, by_email, by_username)
371
372         for r in rows:
373             email = r[EMAIL]
374             username = r[USERNAME]
375             old_user_uuid = r[UUID]
376             userhome = r[HOMECLUSTER]
377
378             if userhome == "":
379                 print("(%s) Skipping %s, no home cluster specified" % (email, old_user_uuid))
380             if old_user_uuid.startswith(userhome):
381                 migratecluster = old_user_uuid[0:5]
382                 migratearv = clusters[migratecluster]
383                 if migratearv.users().get(uuid=old_user_uuid).execute()["username"] != username:
384                     update_username(args, email, old_user_uuid, username, migratecluster, migratearv)
385                 continue
386
387             new_user_uuid = choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters)
388             if new_user_uuid is None:
389                 continue
390
391             remote_users = {}
392             got_error = False
393             for migratecluster in clusters:
394                 # cluster where the migration is happening
395                 migratearv = clusters[migratecluster]
396
397                 # the user's new home cluster
398                 newhomecluster = userhome[0:5]
399                 homearv = clusters[newhomecluster]
400
401                 newuser = activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid)
402                 if newuser is None:
403                     got_error = True
404                 remote_users[migratecluster] = newuser
405
406             if not got_error:
407                 for migratecluster in clusters:
408                     migratearv = clusters[migratecluster]
409                     newuser = remote_users[migratecluster]
410                     if newuser is False:
411                         continue
412
413                     print("(%s) Migrating %s to %s on %s" % (email, old_user_uuid, new_user_uuid, migratecluster))
414
415                     migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid)
416
417                     if newuser['username'] != username:
418                         update_username(args, email, new_user_uuid, username, migratecluster, migratearv)
419
420 if __name__ == "__main__":
421     main()