20613: Log early discovery document retries
[arvados.git] / sdk / python / arvados / commands / federation_migrate.py
1 #!/usr/bin/env python3
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 #
7 # Migration tool for merging user accounts belonging to the same user
8 # but on separate clusters to use a single user account managed by a
9 # specific cluster.
10 #
11 # If you're working on this, see
12 # arvados/sdk/python/tests/fed-migrate/README for information about
13 # the testing infrastructure.
14
15 import arvados
16 import arvados.util
17 import arvados.errors
18 import csv
19 import sys
20 import argparse
21 import hmac
22 import urllib.parse
23 import os
24 import hashlib
25 import re
26 from arvados._version import __version__
27 from . import _util as arv_cmd
28
29 EMAIL=0
30 USERNAME=1
31 UUID=2
32 HOMECLUSTER=3
33
34 def connect_clusters(args):
35     clusters = {}
36     errors = []
37     loginCluster = None
38     if args.tokens:
39         print("Reading %s" % args.tokens)
40         with open(args.tokens, "rt") as f:
41             for r in csv.reader(f):
42                 if len(r) != 2:
43                     continue
44                 host = r[0]
45                 token = r[1]
46                 print("Contacting %s" % (host))
47                 arv = arvados.api(host=host, token=token, cache=False, num_retries=args.retries)
48                 clusters[arv._rootDesc["uuidPrefix"]] = arv
49     else:
50         arv = arvados.api(cache=False, num_retries=args.retries)
51         rh = arv._rootDesc["remoteHosts"]
52         tok = arv.api_client_authorizations().current().execute()
53         token = "v2/%s/%s" % (tok["uuid"], tok["api_token"])
54
55         for k,v in rh.items():
56             arv = arvados.api(host=v, token=token, cache=False, insecure=os.environ.get("ARVADOS_API_HOST_INSECURE"))
57             clusters[k] = arv
58
59     for _, arv in clusters.items():
60         config = arv.configs().get().execute()
61         if config["Login"]["LoginCluster"] != "" and loginCluster is None:
62             loginCluster = config["Login"]["LoginCluster"]
63
64     print("Checking that the federation is well connected")
65     for arv in clusters.values():
66         config = arv.configs().get().execute()
67         if loginCluster and config["Login"]["LoginCluster"] != loginCluster and config["ClusterID"] != loginCluster:
68             errors.append("Inconsistent login cluster configuration, expected '%s' on %s but was '%s'" % (loginCluster, config["ClusterID"], config["Login"]["LoginCluster"]))
69             continue
70
71         if arv._rootDesc["revision"] < "20200331":
72             errors.append("Arvados API server revision on cluster '%s' is too old, must be updated to at least Arvados 2.0.2 before running migration." % config["ClusterID"])
73             continue
74
75         try:
76             cur = arv.users().current().execute()
77         except arvados.errors.ApiError as e:
78             errors.append("checking token for %s   %s" % (arv._rootDesc["rootUrl"], e))
79             continue
80
81         if not cur["is_admin"]:
82             errors.append("User %s is not admin on %s" % (cur["uuid"], arv._rootDesc["uuidPrefix"]))
83             continue
84
85         for r in clusters:
86             if r != arv._rootDesc["uuidPrefix"] and r not in arv._rootDesc["remoteHosts"]:
87                 errors.append("%s is missing from remoteHosts of %s" % (r, arv._rootDesc["uuidPrefix"]))
88         for r in arv._rootDesc["remoteHosts"]:
89             if r != "*" and r not in clusters:
90                 print("WARNING: %s is federated with %s but %s is missing from the tokens file or the token is invalid" % (arv._rootDesc["uuidPrefix"], r, r))
91
92     return clusters, errors, loginCluster
93
94
95 def fetch_users(clusters, loginCluster):
96     rows = []
97     by_email = {}
98     by_username = {}
99
100     users = []
101     for c, arv in clusters.items():
102         print("Getting user list from %s" % c)
103         ul = arvados.util.list_all(arv.users().list, bypass_federation=True)
104         for l in ul:
105             if l["uuid"].startswith(c):
106                 users.append(l)
107
108     # Users list is sorted by email
109     # Go through users and collect users with same email
110     # when we see a different email (or get to the end)
111     # call add_accum_rows() to generate the report rows with
112     # the "home cluster" set, and also fill in the by_email table.
113
114     users = sorted(users, key=lambda u: u["email"]+"::"+(u["username"] or "")+"::"+u["uuid"])
115
116     accum = []
117     lastemail = None
118
119     def add_accum_rows():
120         homeuuid = None
121         for a in accum:
122             uuids = set(a["uuid"] for a in accum)
123             homeuuid = ((len(uuids) == 1) and uuids.pop()) or ""
124         for a in accum:
125             r = (a["email"], a["username"], a["uuid"], loginCluster or homeuuid[0:5])
126             by_email.setdefault(a["email"], {})
127             by_email[a["email"]][a["uuid"]] = r
128             homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], a["username"])
129             if homeuuid_and_username not in by_username:
130                 by_username[homeuuid_and_username] = a["email"]
131             elif by_username[homeuuid_and_username] != a["email"]:
132                 print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
133                 exit(1)
134             rows.append(r)
135
136     for u in users:
137         if u["uuid"].endswith("-anonymouspublic") or u["uuid"].endswith("-000000000000000"):
138             continue
139         if lastemail == None:
140             lastemail = u["email"]
141         if u["email"] == lastemail:
142             accum.append(u)
143         else:
144             add_accum_rows()
145             lastemail = u["email"]
146             accum = [u]
147
148     add_accum_rows()
149
150     return rows, by_email, by_username
151
152
153 def read_migrations(args, by_email, by_username):
154     rows = []
155     with open(args.migrate or args.dry_run, "rt") as f:
156         for r in csv.reader(f):
157             if r[EMAIL] == "email":
158                 continue
159             by_email.setdefault(r[EMAIL], {})
160             by_email[r[EMAIL]][r[UUID]] = r
161
162             homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], r[USERNAME])
163             if homeuuid_and_username not in by_username:
164                 by_username[homeuuid_and_username] = r[EMAIL]
165             elif by_username[homeuuid_and_username] != r[EMAIL]:
166                 print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
167                 exit(1)
168
169             rows.append(r)
170     return rows
171
172 def update_username(args, email, user_uuid, username, migratecluster, migratearv):
173     print("(%s) Updating username of %s to '%s' on %s" % (email, user_uuid, username, migratecluster))
174     if args.dry_run:
175         return
176     try:
177         conflicts = migratearv.users().list(filters=[["username", "=", username]], bypass_federation=True).execute()
178         if conflicts["items"]:
179             # There's already a user with the username, move the old user out of the way
180             migratearv.users().update(uuid=conflicts["items"][0]["uuid"],
181                                         bypass_federation=True,
182                                         body={"user": {"username": username+"migrate"}}).execute()
183         migratearv.users().update(uuid=user_uuid,
184                                     bypass_federation=True,
185                                     body={"user": {"username": username}}).execute()
186     except arvados.errors.ApiError as e:
187         print("(%s) Error updating username of %s to '%s' on %s: %s" % (email, user_uuid, username, migratecluster, e))
188
189
190 def choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters):
191     candidates = []
192     conflict = False
193     for b in by_email[email].values():
194         if b[2].startswith(userhome):
195             candidates.append(b)
196         if b[1] != username and b[3] == userhome:
197             print("(%s) Cannot migrate %s, conflicting usernames %s and %s" % (email, old_user_uuid, b[1], username))
198             conflict = True
199             break
200     if conflict:
201         return None
202     if len(candidates) == 0:
203         if len(userhome) == 5 and userhome not in clusters:
204             print("(%s) Cannot migrate %s, unknown home cluster %s (typo?)" % (email, old_user_uuid, userhome))
205             return None
206         print("(%s) No user listed with same email to migrate %s to %s, will create new user with username '%s'" % (email, old_user_uuid, userhome, username))
207         if not args.dry_run:
208             oldhomecluster = old_user_uuid[0:5]
209             oldhomearv = clusters[oldhomecluster]
210             newhomecluster = userhome[0:5]
211             homearv = clusters[userhome]
212             user = None
213             try:
214                 olduser = oldhomearv.users().get(uuid=old_user_uuid).execute()
215                 conflicts = homearv.users().list(filters=[["username", "=", username]],
216                                                  bypass_federation=True).execute()
217                 if conflicts["items"]:
218                     homearv.users().update(
219                         uuid=conflicts["items"][0]["uuid"],
220                         bypass_federation=True,
221                         body={"user": {"username": username+"migrate"}}).execute()
222                 user = homearv.users().create(
223                     body={"user": {
224                         "email": email,
225                         "first_name": olduser["first_name"],
226                         "last_name": olduser["last_name"],
227                         "username": username,
228                         "is_active": olduser["is_active"]}}).execute()
229             except arvados.errors.ApiError as e:
230                 print("(%s) Could not create user: %s" % (email, str(e)))
231                 return None
232
233             tup = (email, username, user["uuid"], userhome)
234         else:
235             # dry run
236             tup = (email, username, "%s-tpzed-xfakexfakexfake" % (userhome[0:5]), userhome)
237         by_email[email][tup[2]] = tup
238         candidates.append(tup)
239     if len(candidates) > 1:
240         print("(%s) Multiple users listed to migrate %s to %s, use full uuid" % (email, old_user_uuid, userhome))
241         return None
242     return candidates[0][2]
243
244
245 def activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid):
246     # create a token for the new user and salt it for the
247     # migration cluster, then use it to access the migration
248     # cluster as the new user once before merging to ensure
249     # the new user is known on that cluster.
250     migratecluster = migratearv._rootDesc["uuidPrefix"]
251     try:
252         if not args.dry_run:
253             newtok = homearv.api_client_authorizations().create(body={
254                 "api_client_authorization": {'owner_uuid': new_user_uuid}}).execute()
255         else:
256             newtok = {"uuid": "dry-run", "api_token": "12345"}
257     except arvados.errors.ApiError as e:
258         print("(%s) Could not create API token for %s: %s" % (email, new_user_uuid, e))
259         return None
260
261     try:
262         findolduser = migratearv.users().list(filters=[["uuid", "=", old_user_uuid]], bypass_federation=True).execute()
263         if len(findolduser["items"]) == 0:
264             return False
265         if len(findolduser["items"]) == 1:
266             olduser = findolduser["items"][0]
267         else:
268             print("(%s) Unexpected result" % (email))
269             return None
270     except arvados.errors.ApiError as e:
271         print("(%s) Could not retrieve user %s from %s, user may have already been migrated: %s" % (email, old_user_uuid, migratecluster, e))
272         return None
273
274     salted = 'v2/' + newtok["uuid"] + '/' + hmac.new(newtok["api_token"].encode(),
275                                                      msg=migratecluster.encode(),
276                                                      digestmod=hashlib.sha1).hexdigest()
277     try:
278         ru = urllib.parse.urlparse(migratearv._rootDesc["rootUrl"])
279         if not args.dry_run:
280             newuser = arvados.api(host=ru.netloc, token=salted,
281                                   insecure=os.environ.get("ARVADOS_API_HOST_INSECURE")).users().current().execute()
282         else:
283             newuser = {"is_active": True, "username": email.split('@')[0], "is_admin": False}
284     except arvados.errors.ApiError as e:
285         print("(%s) Error getting user info for %s from %s: %s" % (email, new_user_uuid, migratecluster, e))
286         return None
287
288     if not newuser["is_active"] and olduser["is_active"]:
289         print("(%s) Activating user %s on %s" % (email, new_user_uuid, migratecluster))
290         try:
291             if not args.dry_run:
292                 migratearv.users().update(uuid=new_user_uuid, bypass_federation=True,
293                                           body={"is_active": True}).execute()
294         except arvados.errors.ApiError as e:
295             print("(%s) Could not activate user %s on %s: %s" % (email, new_user_uuid, migratecluster, e))
296             return None
297
298     if olduser["is_admin"] and not newuser["is_admin"]:
299         print("(%s) Not migrating %s because user is admin but target user %s is not admin on %s. Please ensure the user admin status is the same on both clusters. Note that a federated admin account has admin privileges on the entire federation." % (email, old_user_uuid, new_user_uuid, migratecluster))
300         return None
301
302     return newuser
303
304 def migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid):
305     if args.dry_run:
306         return
307     try:
308         new_owner_uuid = new_user_uuid
309         if args.data_into_subproject:
310             grp = migratearv.groups().create(body={
311                 "owner_uuid": new_user_uuid,
312                 "name": "Migrated from %s (%s)" % (email, old_user_uuid),
313                 "group_class": "project"
314             }, ensure_unique_name=True).execute()
315             new_owner_uuid = grp["uuid"]
316         migratearv.users().merge(old_user_uuid=old_user_uuid,
317                                     new_user_uuid=new_user_uuid,
318                                     new_owner_uuid=new_owner_uuid,
319                                     redirect_to_new_user=True).execute()
320     except arvados.errors.ApiError as e:
321         name_collision = re.search(r'Key \(owner_uuid, name\)=\((.*?), (.*?)\) already exists\.\n.*UPDATE "(.*?)"', e._get_reason())
322         if name_collision:
323             target_owner, rsc_name, rsc_type = name_collision.groups()
324             print("(%s) Cannot migrate to %s because both origin and target users have a %s named '%s'. Please rename the conflicting items or use --data-into-subproject to migrate all users' data into a special subproject." % (email, target_owner, rsc_type[:-1], rsc_name))
325         else:
326             print("(%s) Skipping user migration because of error: %s" % (email, e))
327
328
329 def main():
330     parser = argparse.ArgumentParser(
331         description='Migrate users to federated identity, see https://doc.arvados.org/admin/merge-remote-account.html',
332         parents=[arv_cmd.retry_opt],
333     )
334     parser.add_argument(
335         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
336         help='Print version and exit.')
337     parser.add_argument('--tokens', type=str, metavar='FILE', required=False, help="Read tokens from FILE. Not needed when using LoginCluster.")
338     parser.add_argument('--data-into-subproject', action="store_true", help="Migrate user's data into a separate subproject. This can be used to avoid name collisions from within an account.")
339     group = parser.add_mutually_exclusive_group(required=True)
340     group.add_argument('--report', type=str, metavar='FILE', help="Generate report .csv file listing users by email address and their associated Arvados accounts.")
341     group.add_argument('--migrate', type=str, metavar='FILE', help="Consume report .csv and migrate users to designated Arvados accounts.")
342     group.add_argument('--dry-run', type=str, metavar='FILE', help="Consume report .csv and report how user would be migrated to designated Arvados accounts.")
343     group.add_argument('--check', action="store_true", help="Check that tokens are usable and the federation is well connected.")
344     args = parser.parse_args()
345
346     clusters, errors, loginCluster = connect_clusters(args)
347
348     if errors:
349         for e in errors:
350             print("ERROR: "+str(e))
351         exit(1)
352
353     if args.check:
354         print("Tokens file passed checks")
355         exit(0)
356
357     rows, by_email, by_username = fetch_users(clusters, loginCluster)
358
359     if args.report:
360         out = csv.writer(open(args.report, "wt"))
361         out.writerow(("email", "username", "user uuid", "home cluster"))
362         for r in rows:
363             out.writerow(r)
364         print("Wrote %s" % args.report)
365         return
366
367     if args.migrate or args.dry_run:
368         if args.dry_run:
369             print("Performing dry run")
370
371         rows = read_migrations(args, by_email, by_username)
372
373         for r in rows:
374             email = r[EMAIL]
375             username = r[USERNAME]
376             old_user_uuid = r[UUID]
377             userhome = r[HOMECLUSTER]
378
379             if userhome == "":
380                 print("(%s) Skipping %s, no home cluster specified" % (email, old_user_uuid))
381             if old_user_uuid.startswith(userhome):
382                 migratecluster = old_user_uuid[0:5]
383                 migratearv = clusters[migratecluster]
384                 if migratearv.users().get(uuid=old_user_uuid).execute()["username"] != username:
385                     update_username(args, email, old_user_uuid, username, migratecluster, migratearv)
386                 continue
387
388             new_user_uuid = choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters)
389             if new_user_uuid is None:
390                 continue
391
392             remote_users = {}
393             got_error = False
394             for migratecluster in clusters:
395                 # cluster where the migration is happening
396                 migratearv = clusters[migratecluster]
397
398                 # the user's new home cluster
399                 newhomecluster = userhome[0:5]
400                 homearv = clusters[newhomecluster]
401
402                 newuser = activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid)
403                 if newuser is None:
404                     got_error = True
405                 remote_users[migratecluster] = newuser
406
407             if not got_error:
408                 for migratecluster in clusters:
409                     migratearv = clusters[migratecluster]
410                     newuser = remote_users[migratecluster]
411                     if newuser is False:
412                         continue
413
414                     print("(%s) Migrating %s to %s on %s" % (email, old_user_uuid, new_user_uuid, migratecluster))
415
416                     migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid)
417
418                     if newuser['username'] != username:
419                         update_username(args, email, new_user_uuid, username, migratecluster, migratearv)
420
421 if __name__ == "__main__":
422     main()