CWL spec -> CWL standards
[arvados.git] / sdk / python / arvados / commands / federation_migrate.py
1 #!/usr/bin/env python3
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 #
7 # Migration tool for merging user accounts belonging to the same user
8 # but on separate clusters to use a single user account managed by a
9 # specific cluster.
10 #
11 # If you're working on this, see
12 # arvados/sdk/python/tests/fed-migrate/README for information about
13 # the testing infrastructure.
14
15 import arvados
16 import arvados.util
17 import arvados.errors
18 import csv
19 import sys
20 import argparse
21 import hmac
22 import urllib.parse
23 import os
24 import hashlib
25 from arvados._version import __version__
26
27 EMAIL=0
28 USERNAME=1
29 UUID=2
30 HOMECLUSTER=3
31
32 def connect_clusters(args):
33     clusters = {}
34     errors = []
35     loginCluster = None
36     if args.tokens:
37         print("Reading %s" % args.tokens)
38         with open(args.tokens, "rt") as f:
39             for r in csv.reader(f):
40                 if len(r) != 2:
41                     continue
42                 host = r[0]
43                 token = r[1]
44                 print("Contacting %s" % (host))
45                 arv = arvados.api(host=host, token=token, cache=False)
46                 clusters[arv._rootDesc["uuidPrefix"]] = arv
47     else:
48         arv = arvados.api(cache=False)
49         rh = arv._rootDesc["remoteHosts"]
50         tok = arv.api_client_authorizations().current().execute()
51         token = "v2/%s/%s" % (tok["uuid"], tok["api_token"])
52
53         for k,v in rh.items():
54             arv = arvados.api(host=v, token=token, cache=False, insecure=os.environ.get("ARVADOS_API_HOST_INSECURE"))
55             clusters[k] = arv
56
57     for _, arv in clusters.items():
58         config = arv.configs().get().execute()
59         if config["Login"]["LoginCluster"] != "" and loginCluster is None:
60             loginCluster = config["Login"]["LoginCluster"]
61
62     print("Checking that the federation is well connected")
63     for arv in clusters.values():
64         config = arv.configs().get().execute()
65         if loginCluster and config["Login"]["LoginCluster"] != loginCluster and config["ClusterID"] != loginCluster:
66             errors.append("Inconsistent login cluster configuration, expected '%s' on %s but was '%s'" % (loginCluster, config["ClusterID"], config["Login"]["LoginCluster"]))
67             continue
68
69         if arv._rootDesc["revision"] < "20200331":
70             errors.append("Arvados API server revision on cluster '%s' is too old, must be updated to at least Arvados 2.0.2 before running migration." % config["ClusterID"])
71             continue
72
73         try:
74             cur = arv.users().current().execute()
75         except arvados.errors.ApiError as e:
76             errors.append("checking token for %s   %s" % (arv._rootDesc["rootUrl"], e))
77             continue
78
79         if not cur["is_admin"]:
80             errors.append("User %s is not admin on %s" % (cur["uuid"], arv._rootDesc["uuidPrefix"]))
81             continue
82
83         for r in clusters:
84             if r != arv._rootDesc["uuidPrefix"] and r not in arv._rootDesc["remoteHosts"]:
85                 errors.append("%s is missing from remoteHosts of %s" % (r, arv._rootDesc["uuidPrefix"]))
86         for r in arv._rootDesc["remoteHosts"]:
87             if r != "*" and r not in clusters:
88                 print("WARNING: %s is federated with %s but %s is missing from the tokens file or the token is invalid" % (arv._rootDesc["uuidPrefix"], r, r))
89
90     return clusters, errors, loginCluster
91
92
93 def fetch_users(clusters, loginCluster):
94     rows = []
95     by_email = {}
96     by_username = {}
97
98     users = []
99     for c, arv in clusters.items():
100         print("Getting user list from %s" % c)
101         ul = arvados.util.list_all(arv.users().list, bypass_federation=True)
102         for l in ul:
103             if l["uuid"].startswith(c):
104                 users.append(l)
105
106     # Users list is sorted by email
107     # Go through users and collect users with same email
108     # when we see a different email (or get to the end)
109     # call add_accum_rows() to generate the report rows with
110     # the "home cluster" set, and also fill in the by_email table.
111
112     users = sorted(users, key=lambda u: u["email"]+"::"+(u["username"] or "")+"::"+u["uuid"])
113
114     accum = []
115     lastemail = None
116
117     def add_accum_rows():
118         homeuuid = None
119         for a in accum:
120             uuids = set(a["uuid"] for a in accum)
121             homeuuid = ((len(uuids) == 1) and uuids.pop()) or ""
122         for a in accum:
123             r = (a["email"], a["username"], a["uuid"], loginCluster or homeuuid[0:5])
124             by_email.setdefault(a["email"], {})
125             by_email[a["email"]][a["uuid"]] = r
126             homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], a["username"])
127             if homeuuid_and_username not in by_username:
128                 by_username[homeuuid_and_username] = a["email"]
129             elif by_username[homeuuid_and_username] != a["email"]:
130                 print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
131                 exit(1)
132             rows.append(r)
133
134     for u in users:
135         if u["uuid"].endswith("-anonymouspublic") or u["uuid"].endswith("-000000000000000"):
136             continue
137         if lastemail == None:
138             lastemail = u["email"]
139         if u["email"] == lastemail:
140             accum.append(u)
141         else:
142             add_accum_rows()
143             lastemail = u["email"]
144             accum = [u]
145
146     add_accum_rows()
147
148     return rows, by_email, by_username
149
150
151 def read_migrations(args, by_email, by_username):
152     rows = []
153     with open(args.migrate or args.dry_run, "rt") as f:
154         for r in csv.reader(f):
155             if r[EMAIL] == "email":
156                 continue
157             by_email.setdefault(r[EMAIL], {})
158             by_email[r[EMAIL]][r[UUID]] = r
159
160             homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], r[USERNAME])
161             if homeuuid_and_username not in by_username:
162                 by_username[homeuuid_and_username] = r[EMAIL]
163             elif by_username[homeuuid_and_username] != r[EMAIL]:
164                 print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
165                 exit(1)
166
167             rows.append(r)
168     return rows
169
170 def update_username(args, email, user_uuid, username, migratecluster, migratearv):
171     print("(%s) Updating username of %s to '%s' on %s" % (email, user_uuid, username, migratecluster))
172     if not args.dry_run:
173         try:
174             conflicts = migratearv.users().list(filters=[["username", "=", username]], bypass_federation=True).execute()
175             if conflicts["items"]:
176                 # There's already a user with the username, move the old user out of the way
177                 migratearv.users().update(uuid=conflicts["items"][0]["uuid"],
178                                           bypass_federation=True,
179                                           body={"user": {"username": username+"migrate"}}).execute()
180             migratearv.users().update(uuid=user_uuid,
181                                       bypass_federation=True,
182                                       body={"user": {"username": username}}).execute()
183         except arvados.errors.ApiError as e:
184             print("(%s) Error updating username of %s to '%s' on %s: %s" % (email, user_uuid, username, migratecluster, e))
185
186
187 def choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters):
188     candidates = []
189     conflict = False
190     for b in by_email[email].values():
191         if b[2].startswith(userhome):
192             candidates.append(b)
193         if b[1] != username and b[3] == userhome:
194             print("(%s) Cannot migrate %s, conflicting usernames %s and %s" % (email, old_user_uuid, b[1], username))
195             conflict = True
196             break
197     if conflict:
198         return None
199     if len(candidates) == 0:
200         if len(userhome) == 5 and userhome not in clusters:
201             print("(%s) Cannot migrate %s, unknown home cluster %s (typo?)" % (email, old_user_uuid, userhome))
202             return None
203         print("(%s) No user listed with same email to migrate %s to %s, will create new user with username '%s'" % (email, old_user_uuid, userhome, username))
204         if not args.dry_run:
205             oldhomecluster = old_user_uuid[0:5]
206             oldhomearv = clusters[oldhomecluster]
207             newhomecluster = userhome[0:5]
208             homearv = clusters[userhome]
209             user = None
210             try:
211                 olduser = oldhomearv.users().get(uuid=old_user_uuid).execute()
212                 conflicts = homearv.users().list(filters=[["username", "=", username]],
213                                                  bypass_federation=True).execute()
214                 if conflicts["items"]:
215                     homearv.users().update(uuid=conflicts["items"][0]["uuid"],
216                                            bypass_federation=True,
217                                            body={"user": {"username": username+"migrate"}}).execute()
218                 user = homearv.users().create(body={"user": {"email": email, "username": username,
219                                                              "is_active": olduser["is_active"]}}).execute()
220             except arvados.errors.ApiError as e:
221                 print("(%s) Could not create user: %s" % (email, str(e)))
222                 return None
223
224             tup = (email, username, user["uuid"], userhome)
225         else:
226             # dry run
227             tup = (email, username, "%s-tpzed-xfakexfakexfake" % (userhome[0:5]), userhome)
228         by_email[email][tup[2]] = tup
229         candidates.append(tup)
230     if len(candidates) > 1:
231         print("(%s) Multiple users listed to migrate %s to %s, use full uuid" % (email, old_user_uuid, userhome))
232         return None
233     return candidates[0][2]
234
235
236 def activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid):
237     # create a token for the new user and salt it for the
238     # migration cluster, then use it to access the migration
239     # cluster as the new user once before merging to ensure
240     # the new user is known on that cluster.
241     migratecluster = migratearv._rootDesc["uuidPrefix"]
242     try:
243         if not args.dry_run:
244             newtok = homearv.api_client_authorizations().create(body={
245                 "api_client_authorization": {'owner_uuid': new_user_uuid}}).execute()
246         else:
247             newtok = {"uuid": "dry-run", "api_token": "12345"}
248     except arvados.errors.ApiError as e:
249         print("(%s) Could not create API token for %s: %s" % (email, new_user_uuid, e))
250         return None
251
252     try:
253         findolduser = migratearv.users().list(filters=[["uuid", "=", old_user_uuid]], bypass_federation=True).execute()
254         if len(findolduser["items"]) == 0:
255             return False
256         if len(findolduser["items"]) == 1:
257             olduser = findolduser["items"][0]
258         else:
259             print("(%s) Unexpected result" % (email))
260             return None
261     except arvados.errors.ApiError as e:
262         print("(%s) Could not retrieve user %s from %s, user may have already been migrated: %s" % (email, old_user_uuid, migratecluster, e))
263         return None
264
265     salted = 'v2/' + newtok["uuid"] + '/' + hmac.new(newtok["api_token"].encode(),
266                                                      msg=migratecluster.encode(),
267                                                      digestmod=hashlib.sha1).hexdigest()
268     try:
269         ru = urllib.parse.urlparse(migratearv._rootDesc["rootUrl"])
270         if not args.dry_run:
271             newuser = arvados.api(host=ru.netloc, token=salted,
272                                   insecure=os.environ.get("ARVADOS_API_HOST_INSECURE")).users().current().execute()
273         else:
274             newuser = {"is_active": True, "username": username}
275     except arvados.errors.ApiError as e:
276         print("(%s) Error getting user info for %s from %s: %s" % (email, new_user_uuid, migratecluster, e))
277         return None
278
279     if not newuser["is_active"] and olduser["is_active"]:
280         print("(%s) Activating user %s on %s" % (email, new_user_uuid, migratecluster))
281         try:
282             if not args.dry_run:
283                 migratearv.users().update(uuid=new_user_uuid, bypass_federation=True,
284                                           body={"is_active": True}).execute()
285         except arvados.errors.ApiError as e:
286             print("(%s) Could not activate user %s on %s: %s" % (email, new_user_uuid, migratecluster, e))
287             return None
288
289     if olduser["is_admin"] and not newuser["is_admin"]:
290         print("(%s) Not migrating %s because user is admin but target user %s is not admin on %s" % (email, old_user_uuid, new_user_uuid, migratecluster))
291         return None
292
293     return newuser
294
295 def migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid):
296     try:
297         if not args.dry_run:
298             grp = migratearv.groups().create(body={
299                 "owner_uuid": new_user_uuid,
300                 "name": "Migrated from %s (%s)" % (email, old_user_uuid),
301                 "group_class": "project"
302             }, ensure_unique_name=True).execute()
303             migratearv.users().merge(old_user_uuid=old_user_uuid,
304                                      new_user_uuid=new_user_uuid,
305                                      new_owner_uuid=grp["uuid"],
306                                      redirect_to_new_user=True).execute()
307     except arvados.errors.ApiError as e:
308         print("(%s) Error migrating user: %s" % (email, e))
309
310
311 def main():
312
313     parser = argparse.ArgumentParser(description='Migrate users to federated identity, see https://doc.arvados.org/admin/merge-remote-account.html')
314     parser.add_argument(
315         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
316         help='Print version and exit.')
317     parser.add_argument('--tokens', type=str, required=False)
318     group = parser.add_mutually_exclusive_group(required=True)
319     group.add_argument('--report', type=str, help="Generate report .csv file listing users by email address and their associated Arvados accounts")
320     group.add_argument('--migrate', type=str, help="Consume report .csv and migrate users to designated Arvados accounts")
321     group.add_argument('--dry-run', type=str, help="Consume report .csv and report how user would be migrated to designated Arvados accounts")
322     group.add_argument('--check', action="store_true", help="Check that tokens are usable and the federation is well connected")
323     args = parser.parse_args()
324
325     clusters, errors, loginCluster = connect_clusters(args)
326
327     if errors:
328         for e in errors:
329             print("ERROR: "+str(e))
330         exit(1)
331
332     if args.check:
333         print("Tokens file passed checks")
334         exit(0)
335
336     rows, by_email, by_username = fetch_users(clusters, loginCluster)
337
338     if args.report:
339         out = csv.writer(open(args.report, "wt"))
340         out.writerow(("email", "username", "user uuid", "home cluster"))
341         for r in rows:
342             out.writerow(r)
343         print("Wrote %s" % args.report)
344         return
345
346     if args.migrate or args.dry_run:
347         if args.dry_run:
348             print("Performing dry run")
349
350         rows = read_migrations(args, by_email, by_username)
351
352         for r in rows:
353             email = r[EMAIL]
354             username = r[USERNAME]
355             old_user_uuid = r[UUID]
356             userhome = r[HOMECLUSTER]
357
358             if userhome == "":
359                 print("(%s) Skipping %s, no home cluster specified" % (email, old_user_uuid))
360             if old_user_uuid.startswith(userhome):
361                 migratecluster = old_user_uuid[0:5]
362                 migratearv = clusters[migratecluster]
363                 if migratearv.users().get(uuid=old_user_uuid).execute()["username"] != username:
364                     update_username(args, email, old_user_uuid, username, migratecluster, migratearv)
365                 continue
366
367             new_user_uuid = choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters)
368             if new_user_uuid is None:
369                 continue
370
371             remote_users = {}
372             got_error = False
373             for migratecluster in clusters:
374                 # cluster where the migration is happening
375                 migratearv = clusters[migratecluster]
376
377                 # the user's new home cluster
378                 newhomecluster = userhome[0:5]
379                 homearv = clusters[newhomecluster]
380
381                 newuser = activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid)
382                 if newuser is None:
383                     got_error = True
384                 remote_users[migratecluster] = newuser
385
386             if not got_error:
387                 for migratecluster in clusters:
388                     migratearv = clusters[migratecluster]
389                     newuser = remote_users[migratecluster]
390                     if newuser is False:
391                         continue
392
393                     print("(%s) Migrating %s to %s on %s" % (email, old_user_uuid, new_user_uuid, migratecluster))
394
395                     migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid)
396
397                     if newuser['username'] != username:
398                         update_username(args, email, new_user_uuid, username, migratecluster, migratearv)
399
400 if __name__ == "__main__":
401     main()