20640: Merge branch 'main' into 20640-computed-permissions-api
[arvados.git] / sdk / python / tests / test_computed_permissions.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import arvados
6 import arvados.util
7 from . import run_test_server
8 from .test_util import KeysetTestHelper
9
10 class ComputedPermissionTest(run_test_server.TestCaseWithServers):
11     def test_computed_permission(self):
12         run_test_server.authorize_with('admin')
13         api_client = arvados.api('v1')
14         active_user_uuid = run_test_server.fixture('users')['active']['uuid']
15         resp = api_client.computed_permissions().list(
16             filters=[['user_uuid', '=', active_user_uuid]],
17         ).execute()
18         assert len(resp['items']) > 0
19         for item in resp['items']:
20             assert item['user_uuid'] == active_user_uuid
21
22     def test_keyset_list_all(self):
23         run_test_server.authorize_with('admin')
24         api_client = arvados.api('v1')
25         seen = {}
26         for item in arvados.util.keyset_list_all(api_client.computed_permissions().list, order_key='user_uuid', key_fields=('user_uuid', 'target_uuid')):
27             assert (item['user_uuid'], item['target_uuid']) not in seen
28             seen[(item['user_uuid'], item['target_uuid'])] = True
29
30     def test_iter_computed_permissions(self):
31         run_test_server.authorize_with('admin')
32         api_client = arvados.api('v1')
33         seen = {}
34         for item in arvados.util.iter_computed_permissions(api_client.computed_permissions().list):
35             assert item['perm_level']
36             assert (item['user_uuid'], item['target_uuid']) not in seen
37             seen[(item['user_uuid'], item['target_uuid'])] = True
38
39     def test_iter_computed_permissions_defaults(self):
40         ks = KeysetTestHelper([[
41             {"limit": 1000, "count": "none", "order": ["user_uuid asc", "target_uuid asc"], "filters": []},
42             {"items": [{"user_uuid": "u", "target_uuid": "t"}]}
43         ], [
44             {"limit": 1000, "count": "none", "order": ["user_uuid asc", "target_uuid asc"], "filters": [['user_uuid', '=', 'u'], ['target_uuid', '>', 't']]},
45             {"items": []},
46         ], [
47             {"limit": 1000, "count": "none", "order": ["user_uuid asc", "target_uuid asc"], "filters": [['user_uuid', '>', 'u']]},
48             {"items": []},
49         ]])
50         ls = list(arvados.util.iter_computed_permissions(ks.fn))
51         assert ls == ks.expect[0][1]['items']
52
53     def test_iter_computed_permissions_order_key(self):
54         ks = KeysetTestHelper([[
55             {"limit": 1000, "count": "none", "order": ["target_uuid desc", "user_uuid desc"], "filters": []},
56             {"items": [{"user_uuid": "u", "target_uuid": "t"}]}
57         ], [
58             {"limit": 1000, "count": "none", "order": ["target_uuid desc", "user_uuid desc"], "filters": [['target_uuid', '=', 't'], ['user_uuid', '<', 'u']]},
59             {"items": []},
60         ], [
61             {"limit": 1000, "count": "none", "order": ["target_uuid desc", "user_uuid desc"], "filters": [['target_uuid', '<', 't']]},
62             {"items": []},
63         ]])
64         ls = list(arvados.util.iter_computed_permissions(ks.fn, order_key='target_uuid', ascending=False))
65         assert ls == ks.expect[0][1]['items']
66
67     def test_iter_computed_permissions_num_retries(self):
68         ks = KeysetTestHelper([[
69             {"limit": 1000, "count": "none", "order": ["user_uuid asc", "target_uuid asc"], "filters": []},
70             {"items": []}
71         ]], expect_num_retries=33)
72         assert list(arvados.util.iter_computed_permissions(ks.fn, num_retries=33)) == []
73
74     def test_iter_computed_permissions_invalid_key_fields(self):
75         ks = KeysetTestHelper([])
76         with self.assertRaises(arvados.errors.ArgumentError) as exc:
77             _ = list(arvados.util.iter_computed_permissions(ks.fn, key_fields=['target_uuid', 'perm_level']))
78         assert exc.exception.args[0] == 'key_fields can have at most one entry that is not order_key'