e1b8ee7d8402f5834ba9391c6fddb78b5d79ef14
[arvados.git] / sdk / python / arvados / commands / federation_migrate.py
1 #!/usr/bin/env python3
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 #
7 # Migration tool for merging user accounts belonging to the same user
8 # but on separate clusters to use a single user account managed by a
9 # specific cluster.
10 #
11 # If you're working on this, see
12 # arvados/sdk/python/tests/fed-migrate/README for information about
13 # the testing infrastructure.
14
15 import arvados
16 import arvados.util
17 import arvados.errors
18 import csv
19 import sys
20 import argparse
21 import hmac
22 import urllib.parse
23 import os
24 import hashlib
25 from arvados._version import __version__
26
27 EMAIL=0
28 USERNAME=1
29 UUID=2
30 HOMECLUSTER=3
31
32 def connect_clusters(args):
33     clusters = {}
34     errors = []
35     loginCluster = None
36     if args.tokens:
37         print("Reading %s" % args.tokens)
38         with open(args.tokens, "rt") as f:
39             for r in csv.reader(f):
40                 if len(r) != 2:
41                     continue
42                 host = r[0]
43                 token = r[1]
44                 print("Contacting %s" % (host))
45                 arv = arvados.api(host=host, token=token, cache=False)
46                 clusters[arv._rootDesc["uuidPrefix"]] = arv
47     else:
48         arv = arvados.api(cache=False)
49         rh = arv._rootDesc["remoteHosts"]
50         tok = arv.api_client_authorizations().current().execute()
51         token = "v2/%s/%s" % (tok["uuid"], tok["api_token"])
52
53         for k,v in rh.items():
54             arv = arvados.api(host=v, token=token, cache=False, insecure=os.environ.get("ARVADOS_API_HOST_INSECURE"))
55             clusters[k] = arv
56
57     for _, arv in clusters.items():
58         config = arv.configs().get().execute()
59         if config["Login"]["LoginCluster"] != "" and loginCluster is None:
60             loginCluster = config["Login"]["LoginCluster"]
61
62     print("Checking that the federation is well connected")
63     for arv in clusters.values():
64         config = arv.configs().get().execute()
65         if loginCluster and config["Login"]["LoginCluster"] != loginCluster and config["ClusterID"] != loginCluster:
66             errors.append("Inconsistent login cluster configuration, expected '%s' on %s but was '%s'" % (loginCluster, config["ClusterID"], config["Login"]["LoginCluster"]))
67             continue
68
69         if arv._rootDesc["revision"] < "20190926":
70             errors.append("Arvados API server revision on cluster '%s' is too old, must be updated to at least Arvados 1.5 before running migration." % config["ClusterID"])
71             continue
72
73         try:
74             cur = arv.users().current().execute()
75         except arvados.errors.ApiError as e:
76             errors.append("checking token for %s   %s" % (arv._rootDesc["rootUrl"], e))
77             continue
78
79         if not cur["is_admin"]:
80             errors.append("User %s is not admin on %s" % (cur["uuid"], arv._rootDesc["uuidPrefix"]))
81             continue
82
83         for r in clusters:
84             if r != arv._rootDesc["uuidPrefix"] and r not in arv._rootDesc["remoteHosts"]:
85                 errors.append("%s is missing from remoteHosts of %s" % (r, arv._rootDesc["uuidPrefix"]))
86         for r in arv._rootDesc["remoteHosts"]:
87             if r != "*" and r not in clusters:
88                 print("WARNING: %s is federated with %s but %s is missing from the tokens file or the token is invalid" % (arv._rootDesc["uuidPrefix"], r, r))
89
90     return clusters, errors, loginCluster
91
92
93 def fetch_users(clusters, loginCluster):
94     rows = []
95     by_email = {}
96     by_username = {}
97
98     users = []
99     for c, arv in clusters.items():
100         print("Getting user list from %s" % c)
101         ul = arvados.util.list_all(arv.users().list, no_federation=True)
102         for l in ul:
103             if l["uuid"].startswith(c):
104                 users.append(l)
105
106     # Users list is sorted by email
107     # Go through users and collect users with same email
108     # when we see a different email (or get to the end)
109     # call add_accum_rows() to generate the report rows with
110     # the "home cluster" set, and also fill in the by_email table.
111
112     users = sorted(users, key=lambda u: u["email"]+"::"+(u["username"] or "")+"::"+u["uuid"])
113
114     accum = []
115     lastemail = None
116
117     def add_accum_rows():
118         homeuuid = None
119         for a in accum:
120             uuids = set(a["uuid"] for a in accum)
121             homeuuid = ((len(uuids) == 1) and uuids.pop()) or ""
122         for a in accum:
123             r = (a["email"], a["username"], a["uuid"], loginCluster or homeuuid[0:5])
124             by_email.setdefault(a["email"], {})
125             by_email[a["email"]][a["uuid"]] = r
126             homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], a["username"])
127             if homeuuid_and_username not in by_username:
128                 by_username[homeuuid_and_username] = a["email"]
129             elif by_username[homeuuid_and_username] != a["email"]:
130                 print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
131                 exit(1)
132             rows.append(r)
133
134     for u in users:
135         if u["uuid"].endswith("-anonymouspublic") or u["uuid"].endswith("-000000000000000"):
136             continue
137         if lastemail == None:
138             lastemail = u["email"]
139         if u["email"] == lastemail:
140             accum.append(u)
141         else:
142             add_accum_rows()
143             lastemail = u["email"]
144             accum = [u]
145
146     add_accum_rows()
147
148     return rows, by_email, by_username
149
150
151 def read_migrations(args, by_email, by_username):
152     rows = []
153     with open(args.migrate or args.dry_run, "rt") as f:
154         for r in csv.reader(f):
155             if r[EMAIL] == "email":
156                 continue
157             by_email.setdefault(r[EMAIL], {})
158             by_email[r[EMAIL]][r[UUID]] = r
159
160             homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], r[USERNAME])
161             if homeuuid_and_username not in by_username:
162                 by_username[homeuuid_and_username] = r[EMAIL]
163             elif by_username[homeuuid_and_username] != r[EMAIL]:
164                 print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
165                 exit(1)
166
167             rows.append(r)
168     return rows
169
170 def update_username(args, email, user_uuid, username, migratecluster, migratearv):
171     print("(%s) Updating username of %s to '%s' on %s" % (email, user_uuid, username, migratecluster))
172     if not args.dry_run:
173         try:
174             conflicts = migratearv.users().list(filters=[["username", "=", username]], no_federation=True).execute()
175             if conflicts["items"]:
176                 migratearv.users().update(uuid=conflicts["items"][0]["uuid"], body={"user": {"username": username+"migrate"}}).execute()
177             migratearv.users().update(uuid=user_uuid, body={"user": {"username": username}}).execute()
178         except arvados.errors.ApiError as e:
179             print("(%s) Error updating username of %s to '%s' on %s: %s" % (email, user_uuid, username, migratecluster, e))
180
181
182 def choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters):
183     candidates = []
184     conflict = False
185     for b in by_email[email].values():
186         if b[2].startswith(userhome):
187             candidates.append(b)
188         if b[1] != username and b[3] == userhome:
189             print("(%s) Cannot migrate %s, conflicting usernames %s and %s" % (email, old_user_uuid, b[1], username))
190             conflict = True
191             break
192     if conflict:
193         return None
194     if len(candidates) == 0:
195         if len(userhome) == 5 and userhome not in clusters:
196             print("(%s) Cannot migrate %s, unknown home cluster %s (typo?)" % (email, old_user_uuid, userhome))
197             return None
198         print("(%s) No user listed with same email to migrate %s to %s, will create new user with username '%s'" % (email, old_user_uuid, userhome, username))
199         if not args.dry_run:
200             oldhomecluster = old_user_uuid[0:5]
201             oldhomearv = clusters[oldhomecluster]
202             newhomecluster = userhome[0:5]
203             homearv = clusters[userhome]
204             user = None
205             try:
206                 olduser = oldhomearv.users().get(uuid=old_user_uuid).execute()
207                 conflicts = homearv.users().list(filters=[["username", "=", username]], no_federation=True).execute()
208                 if conflicts["items"]:
209                     homearv.users().update(uuid=conflicts["items"][0]["uuid"], body={"user": {"username": username+"migrate"}}).execute()
210                 user = homearv.users().create(body={"user": {"email": email, "username": username, "is_active": olduser["is_active"]}}).execute()
211             except arvados.errors.ApiError as e:
212                 print("(%s) Could not create user: %s" % (email, str(e)))
213                 return None
214
215             tup = (email, username, user["uuid"], userhome)
216         else:
217             # dry run
218             tup = (email, username, "%s-tpzed-xfakexfakexfake" % (userhome[0:5]), userhome)
219         by_email[email][tup[2]] = tup
220         candidates.append(tup)
221     if len(candidates) > 1:
222         print("(%s) Multiple users listed to migrate %s to %s, use full uuid" % (email, old_user_uuid, userhome))
223         return None
224     return candidates[0][2]
225
226
227 def activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid):
228     # create a token for the new user and salt it for the
229     # migration cluster, then use it to access the migration
230     # cluster as the new user once before merging to ensure
231     # the new user is known on that cluster.
232     migratecluster = migratearv._rootDesc["uuidPrefix"]
233     try:
234         if not args.dry_run:
235             newtok = homearv.api_client_authorizations().create(body={
236                 "api_client_authorization": {'owner_uuid': new_user_uuid}}).execute()
237         else:
238             newtok = {"uuid": "dry-run", "api_token": "12345"}
239     except arvados.errors.ApiError as e:
240         print("(%s) Could not create API token for %s: %s" % (email, new_user_uuid, e))
241         return None
242
243     try:
244         findolduser = migratearv.users().list(filters=[["uuid", "=", old_user_uuid]], no_federation=True).execute()
245         if len(findolduser["items"]) == 0:
246             return False
247         if len(findolduser["items"]) == 1:
248             olduser = findolduser["items"][0]
249         else:
250             print("(%s) Unexpected result" % (email))
251             return None
252     except arvados.errors.ApiError as e:
253         print("(%s) Could not retrieve user %s from %s, user may have already been migrated: %s" % (email, old_user_uuid, migratecluster, e))
254         return None
255
256     salted = 'v2/' + newtok["uuid"] + '/' + hmac.new(newtok["api_token"].encode(),
257                                                      msg=migratecluster.encode(),
258                                                      digestmod=hashlib.sha1).hexdigest()
259     try:
260         ru = urllib.parse.urlparse(migratearv._rootDesc["rootUrl"])
261         if not args.dry_run:
262             newuser = arvados.api(host=ru.netloc, token=salted, insecure=os.environ.get("ARVADOS_API_HOST_INSECURE")).users().current().execute()
263         else:
264             newuser = {"is_active": True, "username": username}
265     except arvados.errors.ApiError as e:
266         print("(%s) Error getting user info for %s from %s: %s" % (email, new_user_uuid, migratecluster, e))
267         return None
268
269     if not newuser["is_active"] and olduser["is_active"]:
270         print("(%s) Activating user %s on %s" % (email, new_user_uuid, migratecluster))
271         try:
272             if not args.dry_run:
273                 migratearv.users().update(uuid=new_user_uuid, body={"is_active": True}).execute()
274         except arvados.errors.ApiError as e:
275             print("(%s) Could not activate user %s on %s: %s" % (email, new_user_uuid, migratecluster, e))
276             return None
277
278     if olduser["is_admin"] and not newuser["is_admin"]:
279         print("(%s) Not migrating %s because user is admin but target user %s is not admin on %s" % (email, old_user_uuid, new_user_uuid, migratecluster))
280         return None
281
282     return newuser
283
284 def migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid):
285     try:
286         if not args.dry_run:
287             grp = migratearv.groups().create(body={
288                 "owner_uuid": new_user_uuid,
289                 "name": "Migrated from %s (%s)" % (email, old_user_uuid),
290                 "group_class": "project"
291             }, ensure_unique_name=True).execute()
292             migratearv.users().merge(old_user_uuid=old_user_uuid,
293                                      new_user_uuid=new_user_uuid,
294                                      new_owner_uuid=grp["uuid"],
295                                      redirect_to_new_user=True).execute()
296     except arvados.errors.ApiError as e:
297         print("(%s) Error migrating user: %s" % (email, e))
298
299
300 def main():
301
302     parser = argparse.ArgumentParser(description='Migrate users to federated identity, see https://doc.arvados.org/admin/merge-remote-account.html')
303     parser.add_argument(
304         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
305         help='Print version and exit.')
306     parser.add_argument('--tokens', type=str, required=False)
307     group = parser.add_mutually_exclusive_group(required=True)
308     group.add_argument('--report', type=str, help="Generate report .csv file listing users by email address and their associated Arvados accounts")
309     group.add_argument('--migrate', type=str, help="Consume report .csv and migrate users to designated Arvados accounts")
310     group.add_argument('--dry-run', type=str, help="Consume report .csv and report how user would be migrated to designated Arvados accounts")
311     group.add_argument('--check', action="store_true", help="Check that tokens are usable and the federation is well connected")
312     args = parser.parse_args()
313
314     clusters, errors, loginCluster = connect_clusters(args)
315
316     if errors:
317         for e in errors:
318             print("ERROR: "+str(e))
319         exit(1)
320
321     if args.check:
322         print("Tokens file passed checks")
323         exit(0)
324
325     rows, by_email, by_username = fetch_users(clusters, loginCluster)
326
327     if args.report:
328         out = csv.writer(open(args.report, "wt"))
329         out.writerow(("email", "username", "user uuid", "home cluster"))
330         for r in rows:
331             out.writerow(r)
332         print("Wrote %s" % args.report)
333         return
334
335     if args.migrate or args.dry_run:
336         if args.dry_run:
337             print("Performing dry run")
338
339         rows = read_migrations(args, by_email, by_username)
340
341         for r in rows:
342             email = r[EMAIL]
343             username = r[USERNAME]
344             old_user_uuid = r[UUID]
345             userhome = r[HOMECLUSTER]
346
347             if userhome == "":
348                 print("(%s) Skipping %s, no home cluster specified" % (email, old_user_uuid))
349             if old_user_uuid.startswith(userhome):
350                 migratecluster = old_user_uuid[0:5]
351                 migratearv = clusters[migratecluster]
352                 if migratearv.users().get(uuid=old_user_uuid).execute()["username"] != username:
353                     update_username(args, email, old_user_uuid, username, migratecluster, migratearv)
354                 continue
355
356             new_user_uuid = choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters)
357             if new_user_uuid is None:
358                 continue
359
360             remote_users = {}
361             got_error = False
362             for migratecluster in clusters:
363                 # cluster where the migration is happening
364                 migratearv = clusters[migratecluster]
365
366                 # the user's new home cluster
367                 newhomecluster = userhome[0:5]
368                 homearv = clusters[newhomecluster]
369
370                 newuser = activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid)
371                 if newuser is None:
372                     got_error = True
373                 remote_users[migratecluster] = newuser
374
375             if not got_error:
376                 for migratecluster in clusters:
377                     migratearv = clusters[migratecluster]
378                     newuser = remote_users[migratecluster]
379                     if newuser is False:
380                         continue
381
382                     print("(%s) Migrating %s to %s on %s" % (email, old_user_uuid, new_user_uuid, migratecluster))
383
384                     migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid)
385
386                     if newuser['username'] != username:
387                         update_username(args, email, new_user_uuid, username, migratecluster, migratearv)
388
389 if __name__ == "__main__":
390     main()