1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
13 from pathlib import Path
14 from unittest import mock
20 class KeysetTestHelper:
21 def __init__(self, expect, expect_num_retries=0):
24 self.expect_num_retries = expect_num_retries
26 def fn(self, **kwargs):
27 assert kwargs == self.expect[self.n][0]
30 def execute(self, num_retries):
31 assert num_retries == self.expect_num_retries
33 return self.expect[self.n-1][1]
36 'uuid': 'zzzzz-zyyyz-zzzzzyyyyywwwww',
37 'name': 'KeysetListAllTestCase.test_select mock',
38 'created_at': '2023-08-28T12:34:56.123456Z',
41 _FAKE_COMPUTED_PERMISSIONS_ITEM = {
42 'user_uuid': 'zzzzz-zyyyz-zzzzzyyyyywwwww',
43 'target_uuid': 'zzzzz-ttttt-xxxxxyyyyyzzzzz',
44 'perm_level': 'can_write',
47 class KeysetListAllTestCase(unittest.TestCase):
49 ks = KeysetTestHelper([[
50 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
54 ls = list(arvados.util.keyset_list_all(ks.fn))
55 self.assertEqual(ls, [])
57 def test_oneitem(self):
58 ks = KeysetTestHelper([[
59 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
60 {"items": [{"created_at": "1", "uuid": "1"}]}
62 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", "=", "1"], ["uuid", ">", "1"]]},
65 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">", "1"]]},
69 ls = list(arvados.util.keyset_list_all(ks.fn))
70 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}])
72 def test_onepage2(self):
73 ks = KeysetTestHelper([[
74 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
75 {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
77 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"]]},
81 ls = list(arvados.util.keyset_list_all(ks.fn))
82 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}])
84 def test_onepage3(self):
85 ks = KeysetTestHelper([[
86 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
87 {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}]}
89 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "3"], ["uuid", "!=", "3"]]},
93 ls = list(arvados.util.keyset_list_all(ks.fn))
94 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}])
97 def test_twopage(self):
98 ks = KeysetTestHelper([[
99 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
100 {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
102 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"]]},
103 {"items": [{"created_at": "3", "uuid": "3"}, {"created_at": "4", "uuid": "4"}]}
105 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "4"], ["uuid", "!=", "4"]]},
109 ls = list(arvados.util.keyset_list_all(ks.fn))
110 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"},
111 {"created_at": "2", "uuid": "2"},
112 {"created_at": "3", "uuid": "3"},
113 {"created_at": "4", "uuid": "4"}
116 def test_repeated_key(self):
117 ks = KeysetTestHelper([[
118 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
119 {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "3"}]}
121 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "3"]]},
122 {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "4"}]}
124 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", "=", "2"], ["uuid", ">", "4"]]},
127 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">", "2"]]},
128 {"items": [{"created_at": "3", "uuid": "5"}, {"created_at": "4", "uuid": "6"}]}
130 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "4"], ["uuid", "!=", "6"]]},
135 ls = list(arvados.util.keyset_list_all(ks.fn))
136 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"},
137 {"created_at": "2", "uuid": "2"},
138 {"created_at": "2", "uuid": "3"},
139 {"created_at": "2", "uuid": "4"},
140 {"created_at": "3", "uuid": "5"},
141 {"created_at": "4", "uuid": "6"}
144 def test_onepage_withfilter(self):
145 ks = KeysetTestHelper([[
146 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["foo", ">", "bar"]]},
147 {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
149 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"], ["foo", ">", "bar"]]},
153 ls = list(arvados.util.keyset_list_all(ks.fn, filters=[["foo", ">", "bar"]]))
154 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}])
156 def test_onepage_desc(self):
157 ks = KeysetTestHelper([[
158 {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": []},
159 {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}]}
161 {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": [["created_at", "<=", "1"], ["uuid", "!=", "1"]]},
165 ls = list(arvados.util.keyset_list_all(ks.fn, ascending=False))
166 self.assertEqual(ls, [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}])
168 @parameterized.parameterized.expand(
169 (fake_item, key_fields, order_key, select)
170 for (fake_item, key_fields) in [
171 (_SELECT_FAKE_ITEM, ('uuid',)),
172 (_FAKE_COMPUTED_PERMISSIONS_ITEM, ('user_uuid', 'target_uuid')),
174 for order_key in fake_item
175 if order_key != 'perm_level'
176 for count in range(len(fake_item) + 1)
177 for select in itertools.combinations(fake_item, count)
179 def test_select(self, fake_item, key_fields, order_key, select):
180 # keyset_list_all must have both uuid and order_key to function.
181 # Test that it selects those fields along with user-specified ones.
182 expect_select = {*key_fields, order_key, *select}
185 for key, value in fake_item.items()
186 if key in expect_select
188 list_func = mock.Mock()
189 list_func().execute = mock.Mock(
196 list_func.reset_mock()
197 actual = list(arvados.util.keyset_list_all(list_func, order_key, select=list(select), key_fields=key_fields))
198 self.assertEqual(actual, [item])
199 calls = list_func.call_args_list
200 self.assertTrue(len(calls) >= 2, "list_func() not called enough to exhaust items")
201 for args, kwargs in calls:
202 self.assertEqual(set(kwargs.get('select', ())), expect_select)
205 class TestBaseDirectories:
206 SELF_PATH = Path(__file__)
209 def dir_spec(self, tmp_path):
210 return arvados.util._BaseDirectorySpec(
215 f"{tmp_path / '.test1'}:{tmp_path / '.test2'}",
219 def env(self, tmp_path):
220 return {'HOME': str(tmp_path)}
224 orig_umask = os.umask(0o002)
230 def test_search_systemd_dirs(self, dir_spec, env, tmp_path):
231 env['TEST_DIRECTORY'] = f'{tmp_path}:{self.SELF_PATH.parent}'
232 dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
233 actual = list(dirs.search(self.SELF_PATH.name))
234 assert actual == [self.SELF_PATH]
236 def test_search_xdg_home(self, dir_spec, env, tmp_path):
237 env['XDG_TEST_HOME'] = str(self.SELF_PATH.parent.parent)
238 dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
239 actual = list(dirs.search(self.SELF_PATH.name))
240 assert actual == [self.SELF_PATH]
242 def test_search_xdg_dirs(self, dir_spec, env, tmp_path):
243 env['XDG_TEST_DIRS'] = f'{tmp_path}:{self.SELF_PATH.parent.parent}'
244 dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
245 actual = list(dirs.search(self.SELF_PATH.name))
246 assert actual == [self.SELF_PATH]
248 def test_search_all_dirs(self, dir_spec, env, tmp_path):
249 env['TEST_DIRECTORY'] = f'{tmp_path}:{self.SELF_PATH.parent}'
250 env['XDG_TEST_HOME'] = str(self.SELF_PATH.parent.parent)
251 env['XDG_TEST_DIRS'] = f'{tmp_path}:{self.SELF_PATH.parent.parent}'
252 dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
253 actual = list(dirs.search(self.SELF_PATH.name))
254 assert actual == [self.SELF_PATH, self.SELF_PATH, self.SELF_PATH]
256 def test_search_default_home(self, dir_spec, env, tmp_path):
257 expected = tmp_path / dir_spec.xdg_home_default / 'default_home'
258 expected.parent.mkdir()
260 dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
261 actual = list(dirs.search(expected.name))
262 assert actual == [expected]
264 def test_search_default_dirs(self, dir_spec, env, tmp_path):
265 _, _, default_dir = dir_spec.xdg_dirs_default.rpartition(':')
266 expected = Path(default_dir, 'default_dirs')
267 expected.parent.mkdir()
269 dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
270 actual = list(dirs.search(expected.name))
271 assert actual == [expected]
273 def test_search_no_default_dirs(self, dir_spec, env, tmp_path):
274 dir_spec.xdg_dirs_key = None
275 dir_spec.xdg_dirs_default = None
276 for subdir in ['.test1', '.test2', dir_spec.xdg_home_default]:
277 expected = tmp_path / subdir / 'no_dirs'
278 expected.parent.mkdir()
280 dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
281 actual = list(dirs.search(expected.name))
282 assert actual == [expected]
284 def test_ignore_relative_directories(self, dir_spec, env, tmp_path):
285 test_path = Path(*self.SELF_PATH.parts[-2:])
286 assert test_path.exists(), "test setup problem: need an existing file in a subdirectory of ."
287 parent_path = str(test_path.parent)
288 env['TEST_DIRECTORY'] = '.'
289 env['XDG_TEST_HOME'] = parent_path
290 env['XDG_TEST_DIRS'] = parent_path
291 dirs = arvados.util._BaseDirectories(dir_spec, env, parent_path)
292 assert not list(dirs.search(test_path.name))
294 def test_search_warns_nondefault_home(self, dir_spec, env, tmp_path, caplog):
295 search_path = tmp_path / dir_spec.xdg_home_default / 'Search' / 'SearchConfig'
296 search_path.parent.mkdir(parents=True)
298 env[dir_spec.xdg_home_key] = str(tmp_path / '.nonexistent')
299 dirs = arvados.util._BaseDirectories(dir_spec, env, search_path.parent.name)
300 results = list(dirs.search(search_path.name))
301 expect_msg = "{} was not found under your configured ${} ({}), but does exist at the default location ({})".format(
302 Path(*search_path.parts[-2:]),
303 dir_spec.xdg_home_key,
304 env[dir_spec.xdg_home_key],
305 Path(*search_path.parts[:-2]),
307 assert caplog.messages
308 assert any(msg.startswith(expect_msg) for msg in caplog.messages)
311 def test_storage_path_systemd(self, dir_spec, env, tmp_path):
312 expected = tmp_path / 'rwsystemd'
313 expected.mkdir(0o700)
314 env['TEST_DIRECTORY'] = str(expected)
315 dirs = arvados.util._BaseDirectories(dir_spec, env)
316 assert dirs.storage_path() == expected
318 def test_storage_path_systemd_mixed_modes(self, dir_spec, env, tmp_path):
319 rodir = tmp_path / 'rodir'
321 expected = tmp_path / 'rwdir'
322 expected.mkdir(0o700)
323 env['TEST_DIRECTORY'] = f'{rodir}:{expected}'
324 dirs = arvados.util._BaseDirectories(dir_spec, env)
325 assert dirs.storage_path() == expected
327 def test_storage_path_xdg_home(self, dir_spec, env, tmp_path):
328 expected = tmp_path / '.xdghome' / 'arvados'
329 env['XDG_TEST_HOME'] = str(expected.parent)
330 dirs = arvados.util._BaseDirectories(dir_spec, env)
331 assert dirs.storage_path() == expected
332 exp_mode = stat.S_IFDIR | stat.S_IWUSR
333 assert (expected.stat().st_mode & exp_mode) == exp_mode
335 def test_storage_path_default(self, dir_spec, env, tmp_path):
336 expected = tmp_path / dir_spec.xdg_home_default / 'arvados'
337 dirs = arvados.util._BaseDirectories(dir_spec, env)
338 assert dirs.storage_path() == expected
339 exp_mode = stat.S_IFDIR | stat.S_IWUSR
340 assert (expected.stat().st_mode & exp_mode) == exp_mode
342 @pytest.mark.parametrize('subdir,mode', [
344 (Path('sub', 'path'), 0o770),
346 def test_storage_path_subdir(self, dir_spec, env, umask, tmp_path, subdir, mode):
347 expected = tmp_path / dir_spec.xdg_home_default / 'arvados' / subdir
348 dirs = arvados.util._BaseDirectories(dir_spec, env)
349 actual = dirs.storage_path(subdir, mode)
350 assert actual == expected
351 expect_mode = mode | stat.S_IFDIR
352 actual_mode = actual.stat().st_mode
353 assert (actual_mode & expect_mode) == expect_mode
354 assert not (actual_mode & stat.S_IRWXO)
356 def test_empty_xdg_home(self, dir_spec, env, tmp_path):
357 env['XDG_TEST_HOME'] = ''
358 expected = tmp_path / dir_spec.xdg_home_default / 'emptyhome'
359 dirs = arvados.util._BaseDirectories(dir_spec, env, expected.name)
360 assert dirs.storage_path() == expected
362 def test_empty_xdg_dirs(self, dir_spec, env, tmp_path):
363 env['XDG_TEST_DIRS'] = ''
364 _, _, default_dir = dir_spec.xdg_dirs_default.rpartition(':')
365 expected = Path(default_dir, 'empty_dirs')
366 expected.parent.mkdir()
368 dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
369 actual = list(dirs.search(expected.name))
370 assert actual == [expected]
372 def test_spec_key_lookup(self):
373 dirs = arvados.util._BaseDirectories('CACHE')
374 assert dirs._spec.systemd_key == 'CACHE_DIRECTORY'
375 assert dirs._spec.xdg_dirs_key is None
377 def test_spec_enum_lookup(self):
378 dirs = arvados.util._BaseDirectories(arvados.util._BaseDirectorySpecs.CONFIG)
379 assert dirs._spec.systemd_key == 'CONFIGURATION_DIRECTORY'