e533a642ce902d84bc5f7a8529897ba7f179ae33
[arvados.git] / sdk / python / arvados / commands / federation_migrate.py
1 #!/usr/bin/env python3
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 #
7 # Migration tool for merging user accounts belonging to the same user
8 # but on separate clusters to use a single user account managed by a
9 # specific cluster.
10 #
11 # If you're working on this, see
12 # arvados/sdk/python/tests/fed-migrate/README for information about
13 # the testing infrastructure.
14
15 import arvados
16 import arvados.util
17 import arvados.errors
18 import csv
19 import sys
20 import argparse
21 import hmac
22 import urllib.parse
23 import os
24 from arvados._version import __version__
25
26 EMAIL=0
27 USERNAME=1
28 UUID=2
29 HOMECLUSTER=3
30
31 def connect_clusters(args):
32     clusters = {}
33     errors = []
34     loginCluster = None
35     if args.tokens:
36         print("Reading %s" % args.tokens)
37         with open(args.tokens, "rt") as f:
38             for r in csv.reader(f):
39                 if len(r) != 2:
40                     continue
41                 host = r[0]
42                 token = r[1]
43                 print("Contacting %s" % (host))
44                 arv = arvados.api(host=host, token=token, cache=False)
45                 clusters[arv._rootDesc["uuidPrefix"]] = arv
46     else:
47         arv = arvados.api(cache=False)
48         rh = arv._rootDesc["remoteHosts"]
49         tok = arv.api_client_authorizations().current().execute()
50         token = "v2/%s/%s" % (tok["uuid"], tok["api_token"])
51
52         for k,v in rh.items():
53             arv = arvados.api(host=v, token=token, cache=False, insecure=os.environ.get("ARVADOS_API_HOST_INSECURE"))
54             clusters[k] = arv
55
56     for _, arv in clusters.items():
57         config = arv.configs().get().execute()
58         if config["Login"]["LoginCluster"] != "" and loginCluster is None:
59             loginCluster = config["Login"]["LoginCluster"]
60
61     print("Checking that the federation is well connected")
62     for arv in clusters.values():
63         config = arv.configs().get().execute()
64         if loginCluster and config["Login"]["LoginCluster"] != loginCluster and config["ClusterID"] != loginCluster:
65             errors.append("Inconsistent login cluster configuration, expected '%s' on %s but was '%s'" % (loginCluster, config["ClusterID"], config["Login"]["LoginCluster"]))
66             continue
67
68         if arv._rootDesc["revision"] < "20190926":
69             errors.append("Arvados API server revision on cluster '%s' is too old, must be updated to at least Arvados 1.5 before running migration." % config["ClusterID"])
70             continue
71
72         try:
73             cur = arv.users().current().execute()
74         except arvados.errors.ApiError as e:
75             errors.append("checking token for %s   %s" % (arv._rootDesc["rootUrl"], e))
76             continue
77
78         if not cur["is_admin"]:
79             errors.append("User %s is not admin on %s" % (cur["uuid"], arv._rootDesc["uuidPrefix"]))
80             continue
81
82         for r in clusters:
83             if r != arv._rootDesc["uuidPrefix"] and r not in arv._rootDesc["remoteHosts"]:
84                 errors.append("%s is missing from remoteHosts of %s" % (r, arv._rootDesc["uuidPrefix"]))
85         for r in arv._rootDesc["remoteHosts"]:
86             if r != "*" and r not in clusters:
87                 print("WARNING: %s is federated with %s but %s is missing from the tokens file or the token is invalid" % (arv._rootDesc["uuidPrefix"], r, r))
88
89     return clusters, errors, loginCluster
90
91
92 def fetch_users(clusters, loginCluster):
93     rows = []
94     by_email = {}
95     by_username = {}
96
97     users = []
98     for c, arv in clusters.items():
99         print("Getting user list from %s" % c)
100         ul = arvados.util.list_all(arv.users().list)
101         for l in ul:
102             if l["uuid"].startswith(c):
103                 users.append(l)
104
105     # Users list is sorted by email
106     # Go through users and collect users with same email
107     # when we see a different email (or get to the end)
108     # call add_accum_rows() to generate the report rows with
109     # the "home cluster" set, and also fill in the by_email table.
110
111     users = sorted(users, key=lambda u: u["email"]+"::"+(u["username"] or "")+"::"+u["uuid"])
112
113     accum = []
114     lastemail = None
115
116     def add_accum_rows():
117         homeuuid = None
118         for a in accum:
119             uuids = set(a["uuid"] for a in accum)
120             homeuuid = ((len(uuids) == 1) and uuids.pop()) or ""
121         for a in accum:
122             r = (a["email"], a["username"], a["uuid"], loginCluster or homeuuid[0:5])
123             by_email.setdefault(a["email"], {})
124             by_email[a["email"]][a["uuid"]] = r
125             homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], a["username"])
126             if homeuuid_and_username not in by_username:
127                 by_username[homeuuid_and_username] = a["email"]
128             elif by_username[homeuuid_and_username] != a["email"]:
129                 print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
130                 exit(1)
131             rows.append(r)
132
133     for u in users:
134         if u["uuid"].endswith("-anonymouspublic") or u["uuid"].endswith("-000000000000000"):
135             continue
136         if lastemail == None:
137             lastemail = u["email"]
138         if u["email"] == lastemail:
139             accum.append(u)
140         else:
141             add_accum_rows()
142             lastemail = u["email"]
143             accum = [u]
144
145     add_accum_rows()
146
147     return rows, by_email, by_username
148
149
150 def read_migrations(args, by_email, by_username):
151     rows = []
152     with open(args.migrate or args.dry_run, "rt") as f:
153         for r in csv.reader(f):
154             if r[EMAIL] == "email":
155                 continue
156             by_email.setdefault(r[EMAIL], {})
157             by_email[r[EMAIL]][r[UUID]] = r
158
159             homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], r[USERNAME])
160             if homeuuid_and_username not in by_username:
161                 by_username[homeuuid_and_username] = r[EMAIL]
162             elif by_username[homeuuid_and_username] != r[EMAIL]:
163                 print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
164                 exit(1)
165
166             rows.append(r)
167     return rows
168
169 def update_username(args, email, user_uuid, username, migratecluster, migratearv):
170     print("(%s) Updating username of %s to '%s' on %s" % (email, user_uuid, username, migratecluster))
171     if not args.dry_run:
172         try:
173             conflicts = migratearv.users().list(filters=[["username", "=", username]]).execute()
174             if conflicts["items"]:
175                 migratearv.users().update(uuid=conflicts["items"][0]["uuid"], body={"user": {"username": username+"migrate"}}).execute()
176             migratearv.users().update(uuid=user_uuid, body={"user": {"username": username}}).execute()
177         except arvados.errors.ApiError as e:
178             print("(%s) Error updating username of %s to '%s' on %s: %s" % (email, user_uuid, username, migratecluster, e))
179
180
181 def choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters):
182     candidates = []
183     conflict = False
184     for b in by_email[email].values():
185         if b[2].startswith(userhome):
186             candidates.append(b)
187         if b[1] != username and b[3] == userhome:
188             print("(%s) Cannot migrate %s, conflicting usernames %s and %s" % (email, old_user_uuid, b[1], username))
189             conflict = True
190             break
191     if conflict:
192         return None
193     if len(candidates) == 0:
194         if len(userhome) == 5 and userhome not in clusters:
195             print("(%s) Cannot migrate %s, unknown home cluster %s (typo?)" % (email, old_user_uuid, userhome))
196             return None
197         print("(%s) No user listed with same email to migrate %s to %s, will create new user with username '%s'" % (email, old_user_uuid, userhome, username))
198         if not args.dry_run:
199             newhomecluster = userhome[0:5]
200             homearv = clusters[userhome]
201             user = None
202             try:
203                 conflicts = homearv.users().list(filters=[["username", "=", username]]).execute()
204                 if conflicts["items"]:
205                     homearv.users().update(uuid=conflicts["items"][0]["uuid"], body={"user": {"username": username+"migrate"}}).execute()
206                 user = homearv.users().create(body={"user": {"email": email, "username": username}}).execute()
207             except arvados.errors.ApiError as e:
208                 print("(%s) Could not create user: %s" % (email, str(e)))
209                 return None
210
211             tup = (email, username, user["uuid"], userhome)
212         else:
213             # dry run
214             tup = (email, username, "%s-tpzed-xfakexfakexfake" % (userhome[0:5]), userhome)
215         by_email[email][tup[2]] = tup
216         candidates.append(tup)
217     if len(candidates) > 1:
218         print("(%s) Multiple users listed to migrate %s to %s, use full uuid" % (email, old_user_uuid, userhome))
219         return None
220     return candidates[0][2]
221
222
223 def activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid):
224     # create a token for the new user and salt it for the
225     # migration cluster, then use it to access the migration
226     # cluster as the new user once before merging to ensure
227     # the new user is known on that cluster.
228     migratecluster = migratearv._rootDesc["uuidPrefix"]
229     try:
230         if not args.dry_run:
231             newtok = homearv.api_client_authorizations().create(body={
232                 "api_client_authorization": {'owner_uuid': new_user_uuid}}).execute()
233         else:
234             newtok = {"uuid": "dry-run", "api_token": "12345"}
235     except arvados.errors.ApiError as e:
236         print("(%s) Could not create API token for %s: %s" % (email, new_user_uuid, e))
237         return None
238
239     salted = 'v2/' + newtok["uuid"] + '/' + hmac.new(newtok["api_token"].encode(),
240                                                      msg=migratecluster.encode(),
241                                                      digestmod='sha1').hexdigest()
242     try:
243         ru = urllib.parse.urlparse(migratearv._rootDesc["rootUrl"])
244         if not args.dry_run:
245             newuser = arvados.api(host=ru.netloc, token=salted, insecure=os.environ.get("ARVADOS_API_HOST_INSECURE")).users().current().execute()
246         else:
247             newuser = {"is_active": True, "username": username}
248     except arvados.errors.ApiError as e:
249         print("(%s) Error getting user info for %s from %s: %s" % (email, new_user_uuid, migratecluster, e))
250         return None
251
252     try:
253         olduser = migratearv.users().get(uuid=old_user_uuid).execute()
254     except arvados.errors.ApiError as e:
255         if e.resp.status != 404:
256             print("(%s) Could not retrieve user %s from %s, user may have already been migrated: %s" % (email, old_user_uuid, migratecluster, e))
257         return None
258
259     if not newuser["is_active"]:
260         print("(%s) Activating user %s on %s" % (email, new_user_uuid, migratecluster))
261         try:
262             if not args.dry_run:
263                 migratearv.users().update(uuid=new_user_uuid, body={"is_active": True}).execute()
264         except arvados.errors.ApiError as e:
265             print("(%s) Could not activate user %s on %s: %s" % (email, new_user_uuid, migratecluster, e))
266             return None
267
268     if olduser["is_admin"] and not newuser["is_admin"]:
269         print("(%s) Not migrating %s because user is admin but target user %s is not admin on %s" % (email, old_user_uuid, new_user_uuid, migratecluster))
270         return None
271
272     return newuser
273
274 def migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid):
275     try:
276         if not args.dry_run:
277             grp = migratearv.groups().create(body={
278                 "owner_uuid": new_user_uuid,
279                 "name": "Migrated from %s (%s)" % (email, old_user_uuid),
280                 "group_class": "project"
281             }, ensure_unique_name=True).execute()
282             migratearv.users().merge(old_user_uuid=old_user_uuid,
283                                      new_user_uuid=new_user_uuid,
284                                      new_owner_uuid=grp["uuid"],
285                                      redirect_to_new_user=True).execute()
286     except arvados.errors.ApiError as e:
287         print("(%s) Error migrating user: %s" % (email, e))
288
289
290 def main():
291
292     parser = argparse.ArgumentParser(description='Migrate users to federated identity, see https://doc.arvados.org/admin/merge-remote-account.html')
293     parser.add_argument(
294         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
295         help='Print version and exit.')
296     parser.add_argument('--tokens', type=str, required=False)
297     group = parser.add_mutually_exclusive_group(required=True)
298     group.add_argument('--report', type=str, help="Generate report .csv file listing users by email address and their associated Arvados accounts")
299     group.add_argument('--migrate', type=str, help="Consume report .csv and migrate users to designated Arvados accounts")
300     group.add_argument('--dry-run', type=str, help="Consume report .csv and report how user would be migrated to designated Arvados accounts")
301     group.add_argument('--check', action="store_true", help="Check that tokens are usable and the federation is well connected")
302     args = parser.parse_args()
303
304     clusters, errors, loginCluster = connect_clusters(args)
305
306     if errors:
307         for e in errors:
308             print("ERROR: "+str(e))
309         exit(1)
310
311     if args.check:
312         print("Tokens file passed checks")
313         exit(0)
314
315     rows, by_email, by_username = fetch_users(clusters, loginCluster)
316
317     if args.report:
318         out = csv.writer(open(args.report, "wt"))
319         out.writerow(("email", "username", "user uuid", "home cluster"))
320         for r in rows:
321             out.writerow(r)
322         print("Wrote %s" % args.report)
323         return
324
325     if args.migrate or args.dry_run:
326         if args.dry_run:
327             print("Performing dry run")
328
329         rows = read_migrations(args, by_email, by_username)
330
331         for r in rows:
332             email = r[EMAIL]
333             username = r[USERNAME]
334             old_user_uuid = r[UUID]
335             userhome = r[HOMECLUSTER]
336
337             if userhome == "":
338                 print("(%s) Skipping %s, no home cluster specified" % (email, old_user_uuid))
339             if old_user_uuid.startswith(userhome):
340                 migratecluster = old_user_uuid[0:5]
341                 migratearv = clusters[migratecluster]
342                 if migratearv.users().get(uuid=old_user_uuid).execute()["username"] != username:
343                     update_username(args, email, old_user_uuid, username, migratecluster, migratearv)
344                 continue
345
346             new_user_uuid = choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters)
347             if new_user_uuid is None:
348                 continue
349
350             # cluster where the migration is happening
351             for migratecluster in clusters:
352                 migratearv = clusters[migratecluster]
353
354                 # the user's new home cluster
355                 newhomecluster = userhome[0:5]
356                 homearv = clusters[newhomecluster]
357
358                 newuser = activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid)
359                 if newuser is None:
360                     continue
361
362                 print("(%s) Migrating %s to %s on %s" % (email, old_user_uuid, new_user_uuid, migratecluster))
363
364                 migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid)
365
366                 if newuser['username'] != username:
367                     update_username(args, email, new_user_uuid, username, migratecluster, migratearv)
368
369 if __name__ == "__main__":
370     main()