16589: Fixes test check.
[arvados.git] / sdk / python / arvados / commands / federation_migrate.py
1 #!/usr/bin/env python3
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 #
7 # Migration tool for merging user accounts belonging to the same user
8 # but on separate clusters to use a single user account managed by a
9 # specific cluster.
10 #
11 # If you're working on this, see
12 # arvados/sdk/python/tests/fed-migrate/README for information about
13 # the testing infrastructure.
14
15 import arvados
16 import arvados.util
17 import arvados.errors
18 import csv
19 import sys
20 import argparse
21 import hmac
22 import urllib.parse
23 import os
24 import hashlib
25 from arvados._version import __version__
26
27 EMAIL=0
28 USERNAME=1
29 UUID=2
30 HOMECLUSTER=3
31
32 def connect_clusters(args):
33     clusters = {}
34     errors = []
35     loginCluster = None
36     if args.tokens:
37         print("Reading %s" % args.tokens)
38         with open(args.tokens, "rt") as f:
39             for r in csv.reader(f):
40                 if len(r) != 2:
41                     continue
42                 host = r[0]
43                 token = r[1]
44                 print("Contacting %s" % (host))
45                 arv = arvados.api(host=host, token=token, cache=False)
46                 clusters[arv._rootDesc["uuidPrefix"]] = arv
47     else:
48         arv = arvados.api(cache=False)
49         rh = arv._rootDesc["remoteHosts"]
50         tok = arv.api_client_authorizations().current().execute()
51         token = "v2/%s/%s" % (tok["uuid"], tok["api_token"])
52
53         for k,v in rh.items():
54             arv = arvados.api(host=v, token=token, cache=False, insecure=os.environ.get("ARVADOS_API_HOST_INSECURE"))
55             clusters[k] = arv
56
57     for _, arv in clusters.items():
58         config = arv.configs().get().execute()
59         if config["Login"]["LoginCluster"] != "" and loginCluster is None:
60             loginCluster = config["Login"]["LoginCluster"]
61
62     print("Checking that the federation is well connected")
63     for arv in clusters.values():
64         config = arv.configs().get().execute()
65         if loginCluster and config["Login"]["LoginCluster"] != loginCluster and config["ClusterID"] != loginCluster:
66             errors.append("Inconsistent login cluster configuration, expected '%s' on %s but was '%s'" % (loginCluster, config["ClusterID"], config["Login"]["LoginCluster"]))
67             continue
68
69         if arv._rootDesc["revision"] < "20200331":
70             errors.append("Arvados API server revision on cluster '%s' is too old, must be updated to at least Arvados 2.0.2 before running migration." % config["ClusterID"])
71             continue
72
73         try:
74             cur = arv.users().current().execute()
75         except arvados.errors.ApiError as e:
76             errors.append("checking token for %s   %s" % (arv._rootDesc["rootUrl"], e))
77             continue
78
79         if not cur["is_admin"]:
80             errors.append("User %s is not admin on %s" % (cur["uuid"], arv._rootDesc["uuidPrefix"]))
81             continue
82
83         for r in clusters:
84             if r != arv._rootDesc["uuidPrefix"] and r not in arv._rootDesc["remoteHosts"]:
85                 errors.append("%s is missing from remoteHosts of %s" % (r, arv._rootDesc["uuidPrefix"]))
86         for r in arv._rootDesc["remoteHosts"]:
87             if r != "*" and r not in clusters:
88                 print("WARNING: %s is federated with %s but %s is missing from the tokens file or the token is invalid" % (arv._rootDesc["uuidPrefix"], r, r))
89
90     return clusters, errors, loginCluster
91
92
93 def fetch_users(clusters, loginCluster):
94     rows = []
95     by_email = {}
96     by_username = {}
97
98     users = []
99     for c, arv in clusters.items():
100         print("Getting user list from %s" % c)
101         ul = arvados.util.list_all(arv.users().list, bypass_federation=True)
102         for l in ul:
103             if l["uuid"].startswith(c):
104                 users.append(l)
105
106     # Users list is sorted by email
107     # Go through users and collect users with same email
108     # when we see a different email (or get to the end)
109     # call add_accum_rows() to generate the report rows with
110     # the "home cluster" set, and also fill in the by_email table.
111
112     users = sorted(users, key=lambda u: u["email"]+"::"+(u["username"] or "")+"::"+u["uuid"])
113
114     accum = []
115     lastemail = None
116
117     def add_accum_rows():
118         homeuuid = None
119         for a in accum:
120             uuids = set(a["uuid"] for a in accum)
121             homeuuid = ((len(uuids) == 1) and uuids.pop()) or ""
122         for a in accum:
123             r = (a["email"], a["username"], a["uuid"], loginCluster or homeuuid[0:5])
124             by_email.setdefault(a["email"], {})
125             by_email[a["email"]][a["uuid"]] = r
126             homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], a["username"])
127             if homeuuid_and_username not in by_username:
128                 by_username[homeuuid_and_username] = a["email"]
129             elif by_username[homeuuid_and_username] != a["email"]:
130                 print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
131                 exit(1)
132             rows.append(r)
133
134     for u in users:
135         if u["uuid"].endswith("-anonymouspublic") or u["uuid"].endswith("-000000000000000"):
136             continue
137         if lastemail == None:
138             lastemail = u["email"]
139         if u["email"] == lastemail:
140             accum.append(u)
141         else:
142             add_accum_rows()
143             lastemail = u["email"]
144             accum = [u]
145
146     add_accum_rows()
147
148     return rows, by_email, by_username
149
150
151 def read_migrations(args, by_email, by_username):
152     rows = []
153     with open(args.migrate or args.dry_run, "rt") as f:
154         for r in csv.reader(f):
155             if r[EMAIL] == "email":
156                 continue
157             by_email.setdefault(r[EMAIL], {})
158             by_email[r[EMAIL]][r[UUID]] = r
159
160             homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], r[USERNAME])
161             if homeuuid_and_username not in by_username:
162                 by_username[homeuuid_and_username] = r[EMAIL]
163             elif by_username[homeuuid_and_username] != r[EMAIL]:
164                 print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
165                 exit(1)
166
167             rows.append(r)
168     return rows
169
170 def update_username(args, email, user_uuid, username, migratecluster, migratearv):
171     print("(%s) Updating username of %s to '%s' on %s" % (email, user_uuid, username, migratecluster))
172     if args.dry_run:
173         return
174     try:
175         conflicts = migratearv.users().list(filters=[["username", "=", username]], bypass_federation=True).execute()
176         if conflicts["items"]:
177             # There's already a user with the username, move the old user out of the way
178             migratearv.users().update(uuid=conflicts["items"][0]["uuid"],
179                                         bypass_federation=True,
180                                         body={"user": {"username": username+"migrate"}}).execute()
181         migratearv.users().update(uuid=user_uuid,
182                                     bypass_federation=True,
183                                     body={"user": {"username": username}}).execute()
184     except arvados.errors.ApiError as e:
185         print("(%s) Error updating username of %s to '%s' on %s: %s" % (email, user_uuid, username, migratecluster, e))
186
187
188 def choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters):
189     candidates = []
190     conflict = False
191     for b in by_email[email].values():
192         if b[2].startswith(userhome):
193             candidates.append(b)
194         if b[1] != username and b[3] == userhome:
195             print("(%s) Cannot migrate %s, conflicting usernames %s and %s" % (email, old_user_uuid, b[1], username))
196             conflict = True
197             break
198     if conflict:
199         return None
200     if len(candidates) == 0:
201         if len(userhome) == 5 and userhome not in clusters:
202             print("(%s) Cannot migrate %s, unknown home cluster %s (typo?)" % (email, old_user_uuid, userhome))
203             return None
204         print("(%s) No user listed with same email to migrate %s to %s, will create new user with username '%s'" % (email, old_user_uuid, userhome, username))
205         if not args.dry_run:
206             oldhomecluster = old_user_uuid[0:5]
207             oldhomearv = clusters[oldhomecluster]
208             newhomecluster = userhome[0:5]
209             homearv = clusters[userhome]
210             user = None
211             try:
212                 olduser = oldhomearv.users().get(uuid=old_user_uuid).execute()
213                 conflicts = homearv.users().list(filters=[["username", "=", username]],
214                                                  bypass_federation=True).execute()
215                 if conflicts["items"]:
216                     homearv.users().update(
217                         uuid=conflicts["items"][0]["uuid"],
218                         bypass_federation=True,
219                         body={"user": {"username": username+"migrate"}}).execute()
220                 user = homearv.users().create(
221                     body={"user": {
222                         "email": email,
223                         "first_name": olduser["first_name"],
224                         "last_name": olduser["last_name"],
225                         "username": username,
226                         "is_active": olduser["is_active"]}}).execute()
227             except arvados.errors.ApiError as e:
228                 print("(%s) Could not create user: %s" % (email, str(e)))
229                 return None
230
231             tup = (email, username, user["uuid"], userhome)
232         else:
233             # dry run
234             tup = (email, username, "%s-tpzed-xfakexfakexfake" % (userhome[0:5]), userhome)
235         by_email[email][tup[2]] = tup
236         candidates.append(tup)
237     if len(candidates) > 1:
238         print("(%s) Multiple users listed to migrate %s to %s, use full uuid" % (email, old_user_uuid, userhome))
239         return None
240     return candidates[0][2]
241
242
243 def activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid):
244     # create a token for the new user and salt it for the
245     # migration cluster, then use it to access the migration
246     # cluster as the new user once before merging to ensure
247     # the new user is known on that cluster.
248     migratecluster = migratearv._rootDesc["uuidPrefix"]
249     try:
250         if not args.dry_run:
251             newtok = homearv.api_client_authorizations().create(body={
252                 "api_client_authorization": {'owner_uuid': new_user_uuid}}).execute()
253         else:
254             newtok = {"uuid": "dry-run", "api_token": "12345"}
255     except arvados.errors.ApiError as e:
256         print("(%s) Could not create API token for %s: %s" % (email, new_user_uuid, e))
257         return None
258
259     try:
260         findolduser = migratearv.users().list(filters=[["uuid", "=", old_user_uuid]], bypass_federation=True).execute()
261         if len(findolduser["items"]) == 0:
262             return False
263         if len(findolduser["items"]) == 1:
264             olduser = findolduser["items"][0]
265         else:
266             print("(%s) Unexpected result" % (email))
267             return None
268     except arvados.errors.ApiError as e:
269         print("(%s) Could not retrieve user %s from %s, user may have already been migrated: %s" % (email, old_user_uuid, migratecluster, e))
270         return None
271
272     salted = 'v2/' + newtok["uuid"] + '/' + hmac.new(newtok["api_token"].encode(),
273                                                      msg=migratecluster.encode(),
274                                                      digestmod=hashlib.sha1).hexdigest()
275     try:
276         ru = urllib.parse.urlparse(migratearv._rootDesc["rootUrl"])
277         if not args.dry_run:
278             newuser = arvados.api(host=ru.netloc, token=salted,
279                                   insecure=os.environ.get("ARVADOS_API_HOST_INSECURE")).users().current().execute()
280         else:
281             newuser = {"is_active": True, "username": email.split('@')[0], "is_admin": False}
282     except arvados.errors.ApiError as e:
283         print("(%s) Error getting user info for %s from %s: %s" % (email, new_user_uuid, migratecluster, e))
284         return None
285
286     if not newuser["is_active"] and olduser["is_active"]:
287         print("(%s) Activating user %s on %s" % (email, new_user_uuid, migratecluster))
288         try:
289             if not args.dry_run:
290                 migratearv.users().update(uuid=new_user_uuid, bypass_federation=True,
291                                           body={"is_active": True}).execute()
292         except arvados.errors.ApiError as e:
293             print("(%s) Could not activate user %s on %s: %s" % (email, new_user_uuid, migratecluster, e))
294             return None
295
296     if olduser["is_admin"] and not newuser["is_admin"]:
297         import pprint
298         print("(%s) Not migrating %s because user is admin but target user %s is not admin on %s" % (email, old_user_uuid, new_user_uuid, migratecluster))
299         return None
300
301     return newuser
302
303 def migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid):
304     if args.dry_run:
305         return
306     try:
307         new_owner_uuid = new_user_uuid
308         if not args.avoid_subproject_creation:
309             grp = migratearv.groups().create(body={
310                 "owner_uuid": new_user_uuid,
311                 "name": "Migrated from %s (%s)" % (email, old_user_uuid),
312                 "group_class": "project"
313             }, ensure_unique_name=True).execute()
314             new_owner_uuid = grp["uuid"]
315         migratearv.users().merge(old_user_uuid=old_user_uuid,
316                                     new_user_uuid=new_user_uuid,
317                                     new_owner_uuid=new_owner_uuid,
318                                     redirect_to_new_user=True).execute()
319     except arvados.errors.ApiError as e:
320         print("(%s) Skipping user migration because of error: %s" % (email, e))
321
322
323 def main():
324     parser = argparse.ArgumentParser(description='Migrate users to federated identity, see https://doc.arvados.org/admin/merge-remote-account.html')
325     parser.add_argument(
326         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
327         help='Print version and exit.')
328     parser.add_argument('--tokens', type=str, metavar='FILE', required=False, help="Read tokens from FILE. Not needed when using LoginCluster.")
329     parser.add_argument('--avoid-subproject-creation', action="store_true", help="Don't migrate user's data into a separate subproject. This may cause collection or project name collisions from within an account.")
330     group = parser.add_mutually_exclusive_group(required=True)
331     group.add_argument('--report', type=str, metavar='FILE', help="Generate report .csv file listing users by email address and their associated Arvados accounts.")
332     group.add_argument('--migrate', type=str, metavar='FILE', help="Consume report .csv and migrate users to designated Arvados accounts.")
333     group.add_argument('--dry-run', type=str, metavar='FILE', help="Consume report .csv and report how user would be migrated to designated Arvados accounts.")
334     group.add_argument('--check', action="store_true", help="Check that tokens are usable and the federation is well connected.")
335     args = parser.parse_args()
336
337     clusters, errors, loginCluster = connect_clusters(args)
338
339     if errors:
340         for e in errors:
341             print("ERROR: "+str(e))
342         exit(1)
343
344     if args.check:
345         print("Tokens file passed checks")
346         exit(0)
347
348     rows, by_email, by_username = fetch_users(clusters, loginCluster)
349
350     if args.report:
351         out = csv.writer(open(args.report, "wt"))
352         out.writerow(("email", "username", "user uuid", "home cluster"))
353         for r in rows:
354             out.writerow(r)
355         print("Wrote %s" % args.report)
356         return
357
358     if args.migrate or args.dry_run:
359         if args.dry_run:
360             print("Performing dry run")
361
362         rows = read_migrations(args, by_email, by_username)
363
364         for r in rows:
365             email = r[EMAIL]
366             username = r[USERNAME]
367             old_user_uuid = r[UUID]
368             userhome = r[HOMECLUSTER]
369
370             if userhome == "":
371                 print("(%s) Skipping %s, no home cluster specified" % (email, old_user_uuid))
372             if old_user_uuid.startswith(userhome):
373                 migratecluster = old_user_uuid[0:5]
374                 migratearv = clusters[migratecluster]
375                 if migratearv.users().get(uuid=old_user_uuid).execute()["username"] != username:
376                     update_username(args, email, old_user_uuid, username, migratecluster, migratearv)
377                 continue
378
379             new_user_uuid = choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters)
380             if new_user_uuid is None:
381                 continue
382
383             remote_users = {}
384             got_error = False
385             for migratecluster in clusters:
386                 # cluster where the migration is happening
387                 migratearv = clusters[migratecluster]
388
389                 # the user's new home cluster
390                 newhomecluster = userhome[0:5]
391                 homearv = clusters[newhomecluster]
392
393                 newuser = activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid)
394                 if newuser is None:
395                     got_error = True
396                 remote_users[migratecluster] = newuser
397
398             if not got_error:
399                 for migratecluster in clusters:
400                     migratearv = clusters[migratecluster]
401                     newuser = remote_users[migratecluster]
402                     if newuser is False:
403                         continue
404
405                     print("(%s) Migrating %s to %s on %s" % (email, old_user_uuid, new_user_uuid, migratecluster))
406
407                     migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid)
408
409                     if newuser['username'] != username:
410                         update_username(args, email, new_user_uuid, username, migratecluster, migratearv)
411
412 if __name__ == "__main__":
413     main()