16589: Fixes dry-run mode. Adds command arg to avoid subproject creation.
[arvados.git] / sdk / python / arvados / commands / federation_migrate.py
1 #!/usr/bin/env python3
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 #
7 # Migration tool for merging user accounts belonging to the same user
8 # but on separate clusters to use a single user account managed by a
9 # specific cluster.
10 #
11 # If you're working on this, see
12 # arvados/sdk/python/tests/fed-migrate/README for information about
13 # the testing infrastructure.
14
15 import arvados
16 import arvados.util
17 import arvados.errors
18 import csv
19 import sys
20 import argparse
21 import hmac
22 import urllib.parse
23 import os
24 import hashlib
25 from arvados._version import __version__
26
27 EMAIL=0
28 USERNAME=1
29 UUID=2
30 HOMECLUSTER=3
31
32 def connect_clusters(args):
33     clusters = {}
34     errors = []
35     loginCluster = None
36     if args.tokens:
37         print("Reading %s" % args.tokens)
38         with open(args.tokens, "rt") as f:
39             for r in csv.reader(f):
40                 if len(r) != 2:
41                     continue
42                 host = r[0]
43                 token = r[1]
44                 print("Contacting %s" % (host))
45                 arv = arvados.api(host=host, token=token, cache=False)
46                 clusters[arv._rootDesc["uuidPrefix"]] = arv
47     else:
48         arv = arvados.api(cache=False)
49         rh = arv._rootDesc["remoteHosts"]
50         tok = arv.api_client_authorizations().current().execute()
51         token = "v2/%s/%s" % (tok["uuid"], tok["api_token"])
52
53         for k,v in rh.items():
54             arv = arvados.api(host=v, token=token, cache=False, insecure=os.environ.get("ARVADOS_API_HOST_INSECURE"))
55             clusters[k] = arv
56
57     for _, arv in clusters.items():
58         config = arv.configs().get().execute()
59         if config["Login"]["LoginCluster"] != "" and loginCluster is None:
60             loginCluster = config["Login"]["LoginCluster"]
61
62     print("Checking that the federation is well connected")
63     for arv in clusters.values():
64         config = arv.configs().get().execute()
65         if loginCluster and config["Login"]["LoginCluster"] != loginCluster and config["ClusterID"] != loginCluster:
66             errors.append("Inconsistent login cluster configuration, expected '%s' on %s but was '%s'" % (loginCluster, config["ClusterID"], config["Login"]["LoginCluster"]))
67             continue
68
69         if arv._rootDesc["revision"] < "20200331":
70             errors.append("Arvados API server revision on cluster '%s' is too old, must be updated to at least Arvados 2.0.2 before running migration." % config["ClusterID"])
71             continue
72
73         try:
74             cur = arv.users().current().execute()
75         except arvados.errors.ApiError as e:
76             errors.append("checking token for %s   %s" % (arv._rootDesc["rootUrl"], e))
77             continue
78
79         if not cur["is_admin"]:
80             errors.append("User %s is not admin on %s" % (cur["uuid"], arv._rootDesc["uuidPrefix"]))
81             continue
82
83         for r in clusters:
84             if r != arv._rootDesc["uuidPrefix"] and r not in arv._rootDesc["remoteHosts"]:
85                 errors.append("%s is missing from remoteHosts of %s" % (r, arv._rootDesc["uuidPrefix"]))
86         for r in arv._rootDesc["remoteHosts"]:
87             if r != "*" and r not in clusters:
88                 print("WARNING: %s is federated with %s but %s is missing from the tokens file or the token is invalid" % (arv._rootDesc["uuidPrefix"], r, r))
89
90     return clusters, errors, loginCluster
91
92
93 def fetch_users(clusters, loginCluster):
94     rows = []
95     by_email = {}
96     by_username = {}
97
98     users = []
99     for c, arv in clusters.items():
100         print("Getting user list from %s" % c)
101         ul = arvados.util.list_all(arv.users().list, bypass_federation=True)
102         for l in ul:
103             if l["uuid"].startswith(c):
104                 users.append(l)
105
106     # Users list is sorted by email
107     # Go through users and collect users with same email
108     # when we see a different email (or get to the end)
109     # call add_accum_rows() to generate the report rows with
110     # the "home cluster" set, and also fill in the by_email table.
111
112     users = sorted(users, key=lambda u: u["email"]+"::"+(u["username"] or "")+"::"+u["uuid"])
113
114     accum = []
115     lastemail = None
116
117     def add_accum_rows():
118         homeuuid = None
119         for a in accum:
120             uuids = set(a["uuid"] for a in accum)
121             homeuuid = ((len(uuids) == 1) and uuids.pop()) or ""
122         for a in accum:
123             r = (a["email"], a["username"], a["uuid"], loginCluster or homeuuid[0:5])
124             by_email.setdefault(a["email"], {})
125             by_email[a["email"]][a["uuid"]] = r
126             homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], a["username"])
127             if homeuuid_and_username not in by_username:
128                 by_username[homeuuid_and_username] = a["email"]
129             elif by_username[homeuuid_and_username] != a["email"]:
130                 print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
131                 exit(1)
132             rows.append(r)
133
134     for u in users:
135         if u["uuid"].endswith("-anonymouspublic") or u["uuid"].endswith("-000000000000000"):
136             continue
137         if lastemail == None:
138             lastemail = u["email"]
139         if u["email"] == lastemail:
140             accum.append(u)
141         else:
142             add_accum_rows()
143             lastemail = u["email"]
144             accum = [u]
145
146     add_accum_rows()
147
148     return rows, by_email, by_username
149
150
151 def read_migrations(args, by_email, by_username):
152     rows = []
153     with open(args.migrate or args.dry_run, "rt") as f:
154         for r in csv.reader(f):
155             if r[EMAIL] == "email":
156                 continue
157             by_email.setdefault(r[EMAIL], {})
158             by_email[r[EMAIL]][r[UUID]] = r
159
160             homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], r[USERNAME])
161             if homeuuid_and_username not in by_username:
162                 by_username[homeuuid_and_username] = r[EMAIL]
163             elif by_username[homeuuid_and_username] != r[EMAIL]:
164                 print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
165                 exit(1)
166
167             rows.append(r)
168     return rows
169
170 def update_username(args, email, user_uuid, username, migratecluster, migratearv):
171     print("(%s) Updating username of %s to '%s' on %s" % (email, user_uuid, username, migratecluster))
172     if args.dry_run:
173         return
174     try:
175         conflicts = migratearv.users().list(filters=[["username", "=", username]], bypass_federation=True).execute()
176         if conflicts["items"]:
177             # There's already a user with the username, move the old user out of the way
178             migratearv.users().update(uuid=conflicts["items"][0]["uuid"],
179                                         bypass_federation=True,
180                                         body={"user": {"username": username+"migrate"}}).execute()
181         migratearv.users().update(uuid=user_uuid,
182                                     bypass_federation=True,
183                                     body={"user": {"username": username}}).execute()
184     except arvados.errors.ApiError as e:
185         print("(%s) Error updating username of %s to '%s' on %s: %s" % (email, user_uuid, username, migratecluster, e))
186
187
188 def choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters):
189     candidates = []
190     conflict = False
191     for b in by_email[email].values():
192         if b[2].startswith(userhome):
193             candidates.append(b)
194         if b[1] != username and b[3] == userhome:
195             print("(%s) Cannot migrate %s, conflicting usernames %s and %s" % (email, old_user_uuid, b[1], username))
196             conflict = True
197             break
198     if conflict:
199         return None
200     if len(candidates) == 0:
201         if len(userhome) == 5 and userhome not in clusters:
202             print("(%s) Cannot migrate %s, unknown home cluster %s (typo?)" % (email, old_user_uuid, userhome))
203             return None
204         print("(%s) No user listed with same email to migrate %s to %s, will create new user with username '%s'" % (email, old_user_uuid, userhome, username))
205         if not args.dry_run:
206             oldhomecluster = old_user_uuid[0:5]
207             oldhomearv = clusters[oldhomecluster]
208             newhomecluster = userhome[0:5]
209             homearv = clusters[userhome]
210             user = None
211             try:
212                 olduser = oldhomearv.users().get(uuid=old_user_uuid).execute()
213                 conflicts = homearv.users().list(filters=[["username", "=", username]],
214                                                  bypass_federation=True).execute()
215                 if conflicts["items"]:
216                     homearv.users().update(
217                         uuid=conflicts["items"][0]["uuid"],
218                         bypass_federation=True,
219                         body={"user": {"username": username+"migrate"}}).execute()
220                 user = homearv.users().create(
221                     body={"user": {
222                         "email": email,
223                         "username": username,
224                         "is_active": olduser["is_active"]}}).execute()
225             except arvados.errors.ApiError as e:
226                 print("(%s) Could not create user: %s" % (email, str(e)))
227                 return None
228
229             tup = (email, username, user["uuid"], userhome)
230         else:
231             # dry run
232             tup = (email, username, "%s-tpzed-xfakexfakexfake" % (userhome[0:5]), userhome)
233         by_email[email][tup[2]] = tup
234         candidates.append(tup)
235     if len(candidates) > 1:
236         print("(%s) Multiple users listed to migrate %s to %s, use full uuid" % (email, old_user_uuid, userhome))
237         return None
238     return candidates[0][2]
239
240
241 def activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid):
242     # create a token for the new user and salt it for the
243     # migration cluster, then use it to access the migration
244     # cluster as the new user once before merging to ensure
245     # the new user is known on that cluster.
246     migratecluster = migratearv._rootDesc["uuidPrefix"]
247     try:
248         if not args.dry_run:
249             newtok = homearv.api_client_authorizations().create(body={
250                 "api_client_authorization": {'owner_uuid': new_user_uuid}}).execute()
251         else:
252             newtok = {"uuid": "dry-run", "api_token": "12345"}
253     except arvados.errors.ApiError as e:
254         print("(%s) Could not create API token for %s: %s" % (email, new_user_uuid, e))
255         return None
256
257     try:
258         findolduser = migratearv.users().list(filters=[["uuid", "=", old_user_uuid]], bypass_federation=True).execute()
259         if len(findolduser["items"]) == 0:
260             return False
261         if len(findolduser["items"]) == 1:
262             olduser = findolduser["items"][0]
263         else:
264             print("(%s) Unexpected result" % (email))
265             return None
266     except arvados.errors.ApiError as e:
267         print("(%s) Could not retrieve user %s from %s, user may have already been migrated: %s" % (email, old_user_uuid, migratecluster, e))
268         return None
269
270     salted = 'v2/' + newtok["uuid"] + '/' + hmac.new(newtok["api_token"].encode(),
271                                                      msg=migratecluster.encode(),
272                                                      digestmod=hashlib.sha1).hexdigest()
273     try:
274         ru = urllib.parse.urlparse(migratearv._rootDesc["rootUrl"])
275         if not args.dry_run:
276             newuser = arvados.api(host=ru.netloc, token=salted,
277                                   insecure=os.environ.get("ARVADOS_API_HOST_INSECURE")).users().current().execute()
278         else:
279             newuser = {"is_active": True, "username": email.split('@')[0], "is_admin": False}
280     except arvados.errors.ApiError as e:
281         print("(%s) Error getting user info for %s from %s: %s" % (email, new_user_uuid, migratecluster, e))
282         return None
283
284     if not newuser["is_active"] and olduser["is_active"]:
285         print("(%s) Activating user %s on %s" % (email, new_user_uuid, migratecluster))
286         try:
287             if not args.dry_run:
288                 migratearv.users().update(uuid=new_user_uuid, bypass_federation=True,
289                                           body={"is_active": True}).execute()
290         except arvados.errors.ApiError as e:
291             print("(%s) Could not activate user %s on %s: %s" % (email, new_user_uuid, migratecluster, e))
292             return None
293
294     if olduser["is_admin"] and not newuser["is_admin"]:
295         import pprint
296         print("(%s) Not migrating %s because user is admin but target user %s is not admin on %s" % (email, old_user_uuid, new_user_uuid, migratecluster))
297         return None
298
299     return newuser
300
301 def migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid):
302     if args.dry_run:
303         return
304     try:
305         new_owner_uuid = new_user_uuid
306         if not args.avoid_subproject_creation:
307             grp = migratearv.groups().create(body={
308                 "owner_uuid": new_user_uuid,
309                 "name": "Migrated from %s (%s)" % (email, old_user_uuid),
310                 "group_class": "project"
311             }, ensure_unique_name=True).execute()
312             new_owner_uuid = grp["uuid"]
313         migratearv.users().merge(old_user_uuid=old_user_uuid,
314                                     new_user_uuid=new_user_uuid,
315                                     new_owner_uuid=new_owner_uuid,
316                                     redirect_to_new_user=True).execute()
317     except arvados.errors.ApiError as e:
318         print("(%s) Skipping user migration because of error: %s" % (email, e))
319
320
321 def main():
322     parser = argparse.ArgumentParser(description='Migrate users to federated identity, see https://doc.arvados.org/admin/merge-remote-account.html')
323     parser.add_argument(
324         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
325         help='Print version and exit.')
326     parser.add_argument('--tokens', type=str, metavar='FILE', required=False, help="Read tokens from FILE. Not needed when using LoginCluster.")
327     parser.add_argument('--avoid-subproject-creation', action="store_true", help="Don't migrate user's data into a separate subproject. This may cause collection or project name collisions from within an account.")
328     group = parser.add_mutually_exclusive_group(required=True)
329     group.add_argument('--report', type=str, metavar='FILE', help="Generate report .csv file listing users by email address and their associated Arvados accounts.")
330     group.add_argument('--migrate', type=str, metavar='FILE', help="Consume report .csv and migrate users to designated Arvados accounts.")
331     group.add_argument('--dry-run', type=str, metavar='FILE', help="Consume report .csv and report how user would be migrated to designated Arvados accounts.")
332     group.add_argument('--check', action="store_true", help="Check that tokens are usable and the federation is well connected.")
333     args = parser.parse_args()
334
335     clusters, errors, loginCluster = connect_clusters(args)
336
337     if errors:
338         for e in errors:
339             print("ERROR: "+str(e))
340         exit(1)
341
342     if args.check:
343         print("Tokens file passed checks")
344         exit(0)
345
346     rows, by_email, by_username = fetch_users(clusters, loginCluster)
347
348     if args.report:
349         out = csv.writer(open(args.report, "wt"))
350         out.writerow(("email", "username", "user uuid", "home cluster"))
351         for r in rows:
352             out.writerow(r)
353         print("Wrote %s" % args.report)
354         return
355
356     if args.migrate or args.dry_run:
357         if args.dry_run:
358             print("Performing dry run")
359
360         rows = read_migrations(args, by_email, by_username)
361
362         for r in rows:
363             email = r[EMAIL]
364             username = r[USERNAME]
365             old_user_uuid = r[UUID]
366             userhome = r[HOMECLUSTER]
367
368             if userhome == "":
369                 print("(%s) Skipping %s, no home cluster specified" % (email, old_user_uuid))
370             if old_user_uuid.startswith(userhome):
371                 migratecluster = old_user_uuid[0:5]
372                 migratearv = clusters[migratecluster]
373                 if migratearv.users().get(uuid=old_user_uuid).execute()["username"] != username:
374                     update_username(args, email, old_user_uuid, username, migratecluster, migratearv)
375                 continue
376
377             new_user_uuid = choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters)
378             if new_user_uuid is None:
379                 continue
380
381             remote_users = {}
382             got_error = False
383             for migratecluster in clusters:
384                 # cluster where the migration is happening
385                 migratearv = clusters[migratecluster]
386
387                 # the user's new home cluster
388                 newhomecluster = userhome[0:5]
389                 homearv = clusters[newhomecluster]
390
391                 newuser = activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid)
392                 if newuser is None:
393                     got_error = True
394                 remote_users[migratecluster] = newuser
395
396             if not got_error:
397                 for migratecluster in clusters:
398                     migratearv = clusters[migratecluster]
399                     newuser = remote_users[migratecluster]
400                     if newuser is False:
401                         continue
402
403                     print("(%s) Migrating %s to %s on %s" % (email, old_user_uuid, new_user_uuid, migratecluster))
404
405                     migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid)
406
407                     if newuser['username'] != username:
408                         update_username(args, email, new_user_uuid, username, migratecluster, migratearv)
409
410 if __name__ == "__main__":
411     main()