21935: Merge _normalize_stream and _ranges into arvados._internal.streams
[arvados.git] / sdk / python / tests / test_util.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import itertools
6 import os
7 import stat
8 import subprocess
9 import unittest
10
11 import parameterized
12 import pytest
13 from pathlib import Path
14 from unittest import mock
15
16 import arvados
17 import arvados.util
18
19
20 class KeysetTestHelper:
21     def __init__(self, expect, expect_num_retries=0):
22         self.n = 0
23         self.expect = expect
24         self.expect_num_retries = expect_num_retries
25
26     def fn(self, **kwargs):
27         assert kwargs == self.expect[self.n][0]
28         return self
29
30     def execute(self, num_retries):
31         assert num_retries == self.expect_num_retries
32         self.n += 1
33         return self.expect[self.n-1][1]
34
35 _SELECT_FAKE_ITEM = {
36     'uuid': 'zzzzz-zyyyz-zzzzzyyyyywwwww',
37     'name': 'KeysetListAllTestCase.test_select mock',
38     'created_at': '2023-08-28T12:34:56.123456Z',
39 }
40
41 _FAKE_COMPUTED_PERMISSIONS_ITEM = {
42     'user_uuid': 'zzzzz-zyyyz-zzzzzyyyyywwwww',
43     'target_uuid': 'zzzzz-ttttt-xxxxxyyyyyzzzzz',
44     'perm_level': 'can_write',
45 }
46
47 class KeysetListAllTestCase(unittest.TestCase):
48     def test_empty(self):
49         ks = KeysetTestHelper([[
50             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
51             {"items": []}
52         ]])
53
54         ls = list(arvados.util.keyset_list_all(ks.fn))
55         self.assertEqual(ls, [])
56
57     def test_oneitem(self):
58         ks = KeysetTestHelper([[
59             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
60             {"items": [{"created_at": "1", "uuid": "1"}]}
61         ], [
62             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", "=", "1"], ["uuid", ">", "1"]]},
63             {"items": []}
64         ],[
65             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">", "1"]]},
66             {"items": []}
67         ]])
68
69         ls = list(arvados.util.keyset_list_all(ks.fn))
70         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}])
71
72     def test_onepage2(self):
73         ks = KeysetTestHelper([[
74             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
75             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
76         ], [
77             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"]]},
78             {"items": []}
79         ]])
80
81         ls = list(arvados.util.keyset_list_all(ks.fn))
82         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}])
83
84     def test_onepage3(self):
85         ks = KeysetTestHelper([[
86             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
87             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}]}
88         ], [
89             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "3"], ["uuid", "!=", "3"]]},
90             {"items": []}
91         ]])
92
93         ls = list(arvados.util.keyset_list_all(ks.fn))
94         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}])
95
96
97     def test_twopage(self):
98         ks = KeysetTestHelper([[
99             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
100             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
101         ], [
102             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"]]},
103             {"items": [{"created_at": "3", "uuid": "3"}, {"created_at": "4", "uuid": "4"}]}
104         ], [
105             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "4"], ["uuid", "!=", "4"]]},
106             {"items": []}
107         ]])
108
109         ls = list(arvados.util.keyset_list_all(ks.fn))
110         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"},
111                               {"created_at": "2", "uuid": "2"},
112                               {"created_at": "3", "uuid": "3"},
113                               {"created_at": "4", "uuid": "4"}
114         ])
115
116     def test_repeated_key(self):
117         ks = KeysetTestHelper([[
118             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
119             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "3"}]}
120         ], [
121             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "3"]]},
122             {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "4"}]}
123         ], [
124             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", "=", "2"], ["uuid", ">", "4"]]},
125             {"items": []}
126         ], [
127             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">", "2"]]},
128             {"items": [{"created_at": "3", "uuid": "5"}, {"created_at": "4", "uuid": "6"}]}
129         ], [
130             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "4"], ["uuid", "!=", "6"]]},
131             {"items": []}
132         ],
133         ])
134
135         ls = list(arvados.util.keyset_list_all(ks.fn))
136         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"},
137                               {"created_at": "2", "uuid": "2"},
138                               {"created_at": "2", "uuid": "3"},
139                               {"created_at": "2", "uuid": "4"},
140                               {"created_at": "3", "uuid": "5"},
141                               {"created_at": "4", "uuid": "6"}
142         ])
143
144     def test_onepage_withfilter(self):
145         ks = KeysetTestHelper([[
146             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["foo", ">", "bar"]]},
147             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
148         ], [
149             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"], ["foo", ">", "bar"]]},
150             {"items": []}
151         ]])
152
153         ls = list(arvados.util.keyset_list_all(ks.fn, filters=[["foo", ">", "bar"]]))
154         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}])
155
156     def test_onepage_desc(self):
157         ks = KeysetTestHelper([[
158             {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": []},
159             {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}]}
160         ], [
161             {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": [["created_at", "<=", "1"], ["uuid", "!=", "1"]]},
162             {"items": []}
163         ]])
164
165         ls = list(arvados.util.keyset_list_all(ks.fn, ascending=False))
166         self.assertEqual(ls, [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}])
167
168     @parameterized.parameterized.expand(
169         (fake_item, key_fields, order_key, select)
170         for (fake_item, key_fields) in [
171             (_SELECT_FAKE_ITEM, ('uuid',)),
172             (_FAKE_COMPUTED_PERMISSIONS_ITEM, ('user_uuid', 'target_uuid')),
173         ]
174         for order_key in fake_item
175         if order_key != 'perm_level'
176         for count in range(len(fake_item) + 1)
177         for select in itertools.combinations(fake_item, count)
178     )
179     def test_select(self, fake_item, key_fields, order_key, select):
180         # keyset_list_all must have both uuid and order_key to function.
181         # Test that it selects those fields along with user-specified ones.
182         expect_select = {*key_fields, order_key, *select}
183         item = {
184             key: value
185             for key, value in fake_item.items()
186             if key in expect_select
187         }
188         list_func = mock.Mock()
189         list_func().execute = mock.Mock(
190             side_effect=[
191                 {'items': [item]},
192                 {'items': []},
193                 {'items': []},
194             ],
195         )
196         list_func.reset_mock()
197         actual = list(arvados.util.keyset_list_all(list_func, order_key, select=list(select), key_fields=key_fields))
198         self.assertEqual(actual, [item])
199         calls = list_func.call_args_list
200         self.assertTrue(len(calls) >= 2, "list_func() not called enough to exhaust items")
201         for args, kwargs in calls:
202             self.assertEqual(set(kwargs.get('select', ())), expect_select)
203
204
205 class TestBaseDirectories:
206     SELF_PATH = Path(__file__)
207
208     @pytest.fixture
209     def dir_spec(self, tmp_path):
210         return arvados.util._BaseDirectorySpec(
211             'TEST_DIRECTORY',
212             'XDG_TEST_HOME',
213             Path('.test'),
214             'XDG_TEST_DIRS',
215             f"{tmp_path / '.test1'}:{tmp_path / '.test2'}",
216         )
217
218     @pytest.fixture
219     def env(self, tmp_path):
220         return {'HOME': str(tmp_path)}
221
222     @pytest.fixture
223     def umask(self):
224         orig_umask = os.umask(0o002)
225         try:
226             yield
227         finally:
228             os.umask(orig_umask)
229
230     def test_search_systemd_dirs(self, dir_spec, env, tmp_path):
231         env['TEST_DIRECTORY'] = f'{tmp_path}:{self.SELF_PATH.parent}'
232         dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
233         actual = list(dirs.search(self.SELF_PATH.name))
234         assert actual == [self.SELF_PATH]
235
236     def test_search_xdg_home(self, dir_spec, env, tmp_path):
237         env['XDG_TEST_HOME'] = str(self.SELF_PATH.parent.parent)
238         dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
239         actual = list(dirs.search(self.SELF_PATH.name))
240         assert actual == [self.SELF_PATH]
241
242     def test_search_xdg_dirs(self, dir_spec, env, tmp_path):
243         env['XDG_TEST_DIRS'] = f'{tmp_path}:{self.SELF_PATH.parent.parent}'
244         dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
245         actual = list(dirs.search(self.SELF_PATH.name))
246         assert actual == [self.SELF_PATH]
247
248     def test_search_all_dirs(self, dir_spec, env, tmp_path):
249         env['TEST_DIRECTORY'] = f'{tmp_path}:{self.SELF_PATH.parent}'
250         env['XDG_TEST_HOME'] = str(self.SELF_PATH.parent.parent)
251         env['XDG_TEST_DIRS'] = f'{tmp_path}:{self.SELF_PATH.parent.parent}'
252         dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
253         actual = list(dirs.search(self.SELF_PATH.name))
254         assert actual == [self.SELF_PATH, self.SELF_PATH, self.SELF_PATH]
255
256     def test_search_default_home(self, dir_spec, env, tmp_path):
257         expected = tmp_path / dir_spec.xdg_home_default / 'default_home'
258         expected.parent.mkdir()
259         expected.touch()
260         dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
261         actual = list(dirs.search(expected.name))
262         assert actual == [expected]
263
264     def test_search_default_dirs(self, dir_spec, env, tmp_path):
265         _, _, default_dir = dir_spec.xdg_dirs_default.rpartition(':')
266         expected = Path(default_dir, 'default_dirs')
267         expected.parent.mkdir()
268         expected.touch()
269         dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
270         actual = list(dirs.search(expected.name))
271         assert actual == [expected]
272
273     def test_search_no_default_dirs(self, dir_spec, env, tmp_path):
274         dir_spec.xdg_dirs_key = None
275         dir_spec.xdg_dirs_default = None
276         for subdir in ['.test1', '.test2', dir_spec.xdg_home_default]:
277             expected = tmp_path / subdir / 'no_dirs'
278             expected.parent.mkdir()
279             expected.touch()
280         dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
281         actual = list(dirs.search(expected.name))
282         assert actual == [expected]
283
284     def test_ignore_relative_directories(self, dir_spec, env, tmp_path):
285         test_path = Path(*self.SELF_PATH.parts[-2:])
286         assert test_path.exists(), "test setup problem: need an existing file in a subdirectory of ."
287         parent_path = str(test_path.parent)
288         env['TEST_DIRECTORY'] = '.'
289         env['XDG_TEST_HOME'] = parent_path
290         env['XDG_TEST_DIRS'] = parent_path
291         dirs = arvados.util._BaseDirectories(dir_spec, env, parent_path)
292         assert not list(dirs.search(test_path.name))
293
294     def test_search_warns_nondefault_home(self, dir_spec, env, tmp_path, caplog):
295         search_path = tmp_path / dir_spec.xdg_home_default / 'Search' / 'SearchConfig'
296         search_path.parent.mkdir(parents=True)
297         search_path.touch()
298         env[dir_spec.xdg_home_key] = str(tmp_path / '.nonexistent')
299         dirs = arvados.util._BaseDirectories(dir_spec, env, search_path.parent.name)
300         results = list(dirs.search(search_path.name))
301         expect_msg = "{} was not found under your configured ${} ({}), but does exist at the default location ({})".format(
302             Path(*search_path.parts[-2:]),
303             dir_spec.xdg_home_key,
304             env[dir_spec.xdg_home_key],
305             Path(*search_path.parts[:-2]),
306         )
307         assert caplog.messages
308         assert any(msg.startswith(expect_msg) for msg in caplog.messages)
309         assert not results
310
311     def test_storage_path_systemd(self, dir_spec, env, tmp_path):
312         expected = tmp_path / 'rwsystemd'
313         expected.mkdir(0o700)
314         env['TEST_DIRECTORY'] = str(expected)
315         dirs = arvados.util._BaseDirectories(dir_spec, env)
316         assert dirs.storage_path() == expected
317
318     def test_storage_path_systemd_mixed_modes(self, dir_spec, env, tmp_path):
319         rodir = tmp_path / 'rodir'
320         rodir.mkdir(0o500)
321         expected = tmp_path / 'rwdir'
322         expected.mkdir(0o700)
323         env['TEST_DIRECTORY'] = f'{rodir}:{expected}'
324         dirs = arvados.util._BaseDirectories(dir_spec, env)
325         assert dirs.storage_path() == expected
326
327     def test_storage_path_xdg_home(self, dir_spec, env, tmp_path):
328         expected = tmp_path / '.xdghome' / 'arvados'
329         env['XDG_TEST_HOME'] = str(expected.parent)
330         dirs = arvados.util._BaseDirectories(dir_spec, env)
331         assert dirs.storage_path() == expected
332         exp_mode = stat.S_IFDIR | stat.S_IWUSR
333         assert (expected.stat().st_mode & exp_mode) == exp_mode
334
335     def test_storage_path_default(self, dir_spec, env, tmp_path):
336         expected = tmp_path / dir_spec.xdg_home_default / 'arvados'
337         dirs = arvados.util._BaseDirectories(dir_spec, env)
338         assert dirs.storage_path() == expected
339         exp_mode = stat.S_IFDIR | stat.S_IWUSR
340         assert (expected.stat().st_mode & exp_mode) == exp_mode
341
342     @pytest.mark.parametrize('subdir,mode', [
343         ('str/dir', 0o750),
344         (Path('sub', 'path'), 0o770),
345     ])
346     def test_storage_path_subdir(self, dir_spec, env, umask, tmp_path, subdir, mode):
347         expected = tmp_path / dir_spec.xdg_home_default / 'arvados' / subdir
348         dirs = arvados.util._BaseDirectories(dir_spec, env)
349         actual = dirs.storage_path(subdir, mode)
350         assert actual == expected
351         expect_mode = mode | stat.S_IFDIR
352         actual_mode = actual.stat().st_mode
353         assert (actual_mode & expect_mode) == expect_mode
354         assert not (actual_mode & stat.S_IRWXO)
355
356     def test_empty_xdg_home(self, dir_spec, env, tmp_path):
357         env['XDG_TEST_HOME'] = ''
358         expected = tmp_path / dir_spec.xdg_home_default / 'emptyhome'
359         dirs = arvados.util._BaseDirectories(dir_spec, env, expected.name)
360         assert dirs.storage_path() == expected
361
362     def test_empty_xdg_dirs(self, dir_spec, env, tmp_path):
363         env['XDG_TEST_DIRS'] = ''
364         _, _, default_dir = dir_spec.xdg_dirs_default.rpartition(':')
365         expected = Path(default_dir, 'empty_dirs')
366         expected.parent.mkdir()
367         expected.touch()
368         dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
369         actual = list(dirs.search(expected.name))
370         assert actual == [expected]
371
372     def test_spec_key_lookup(self):
373         dirs = arvados.util._BaseDirectories('CACHE')
374         assert dirs._spec.systemd_key == 'CACHE_DIRECTORY'
375         assert dirs._spec.xdg_dirs_key is None
376
377     def test_spec_enum_lookup(self):
378         dirs = arvados.util._BaseDirectories(arvados.util._BaseDirectorySpecs.CONFIG)
379         assert dirs._spec.systemd_key == 'CONFIGURATION_DIRECTORY'