1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
13 from pathlib import Path
14 from unittest import mock
20 class KeysetTestHelper:
21 def __init__(self, expect):
25 def fn(self, **kwargs):
26 if self.expect[self.n][0] != kwargs:
27 raise Exception("Didn't match %s != %s" % (self.expect[self.n][0], kwargs))
30 def execute(self, num_retries):
32 return self.expect[self.n-1][1]
35 'uuid': 'zzzzz-zyyyz-zzzzzyyyyywwwww',
36 'name': 'KeysetListAllTestCase.test_select mock',
37 'created_at': '2023-08-28T12:34:56.123456Z',
40 class KeysetListAllTestCase(unittest.TestCase):
42 ks = KeysetTestHelper([[
43 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
47 ls = list(arvados.util.keyset_list_all(ks.fn))
48 self.assertEqual(ls, [])
50 def test_oneitem(self):
51 ks = KeysetTestHelper([[
52 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
53 {"items": [{"created_at": "1", "uuid": "1"}]}
55 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", "=", "1"], ["uuid", ">", "1"]]},
58 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">", "1"]]},
62 ls = list(arvados.util.keyset_list_all(ks.fn))
63 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}])
65 def test_onepage2(self):
66 ks = KeysetTestHelper([[
67 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
68 {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
70 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"]]},
74 ls = list(arvados.util.keyset_list_all(ks.fn))
75 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}])
77 def test_onepage3(self):
78 ks = KeysetTestHelper([[
79 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
80 {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}]}
82 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "3"], ["uuid", "!=", "3"]]},
86 ls = list(arvados.util.keyset_list_all(ks.fn))
87 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}])
90 def test_twopage(self):
91 ks = KeysetTestHelper([[
92 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
93 {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
95 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"]]},
96 {"items": [{"created_at": "3", "uuid": "3"}, {"created_at": "4", "uuid": "4"}]}
98 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "4"], ["uuid", "!=", "4"]]},
102 ls = list(arvados.util.keyset_list_all(ks.fn))
103 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"},
104 {"created_at": "2", "uuid": "2"},
105 {"created_at": "3", "uuid": "3"},
106 {"created_at": "4", "uuid": "4"}
109 def test_repeated_key(self):
110 ks = KeysetTestHelper([[
111 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
112 {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "3"}]}
114 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "3"]]},
115 {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "4"}]}
117 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", "=", "2"], ["uuid", ">", "4"]]},
120 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">", "2"]]},
121 {"items": [{"created_at": "3", "uuid": "5"}, {"created_at": "4", "uuid": "6"}]}
123 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "4"], ["uuid", "!=", "6"]]},
128 ls = list(arvados.util.keyset_list_all(ks.fn))
129 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"},
130 {"created_at": "2", "uuid": "2"},
131 {"created_at": "2", "uuid": "3"},
132 {"created_at": "2", "uuid": "4"},
133 {"created_at": "3", "uuid": "5"},
134 {"created_at": "4", "uuid": "6"}
137 def test_onepage_withfilter(self):
138 ks = KeysetTestHelper([[
139 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["foo", ">", "bar"]]},
140 {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
142 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"], ["foo", ">", "bar"]]},
146 ls = list(arvados.util.keyset_list_all(ks.fn, filters=[["foo", ">", "bar"]]))
147 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}])
149 def test_onepage_desc(self):
150 ks = KeysetTestHelper([[
151 {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": []},
152 {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}]}
154 {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": [["created_at", "<=", "1"], ["uuid", "!=", "1"]]},
158 ls = list(arvados.util.keyset_list_all(ks.fn, ascending=False))
159 self.assertEqual(ls, [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}])
161 @parameterized.parameterized.expand(zip(
162 itertools.cycle(_SELECT_FAKE_ITEM),
163 itertools.chain.from_iterable(
164 itertools.combinations(_SELECT_FAKE_ITEM, count)
165 for count in range(len(_SELECT_FAKE_ITEM) + 1)
168 def test_select(self, order_key, select):
169 # keyset_list_all must have both uuid and order_key to function.
170 # Test that it selects those fields along with user-specified ones.
171 expect_select = {'uuid', order_key, *select}
174 for key, value in _SELECT_FAKE_ITEM.items()
175 if key in expect_select
177 list_func = mock.Mock()
178 list_func().execute = mock.Mock(
185 list_func.reset_mock()
186 actual = list(arvados.util.keyset_list_all(list_func, order_key, select=list(select)))
187 self.assertEqual(actual, [item])
188 calls = list_func.call_args_list
189 self.assertTrue(len(calls) >= 2, "list_func() not called enough to exhaust items")
190 for args, kwargs in calls:
191 self.assertEqual(set(kwargs.get('select', ())), expect_select)
194 class TestBaseDirectories:
195 SELF_PATH = Path(__file__)
198 def dir_spec(self, tmp_path):
199 return arvados.util._BaseDirectorySpec(
204 f"{tmp_path / '.test1'}:{tmp_path / '.test2'}",
208 def env(self, tmp_path):
209 return {'HOME': str(tmp_path)}
213 orig_umask = os.umask(0o002)
219 def test_search_systemd_dirs(self, dir_spec, env, tmp_path):
220 env['TEST_DIRECTORY'] = f'{tmp_path}:{self.SELF_PATH.parent}'
221 dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
222 actual = list(dirs.search(self.SELF_PATH.name))
223 assert actual == [self.SELF_PATH]
225 def test_search_xdg_home(self, dir_spec, env, tmp_path):
226 env['XDG_TEST_HOME'] = str(self.SELF_PATH.parent.parent)
227 dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
228 actual = list(dirs.search(self.SELF_PATH.name))
229 assert actual == [self.SELF_PATH]
231 def test_search_xdg_dirs(self, dir_spec, env, tmp_path):
232 env['XDG_TEST_DIRS'] = f'{tmp_path}:{self.SELF_PATH.parent.parent}'
233 dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
234 actual = list(dirs.search(self.SELF_PATH.name))
235 assert actual == [self.SELF_PATH]
237 def test_search_all_dirs(self, dir_spec, env, tmp_path):
238 env['TEST_DIRECTORY'] = f'{tmp_path}:{self.SELF_PATH.parent}'
239 env['XDG_TEST_HOME'] = str(self.SELF_PATH.parent.parent)
240 env['XDG_TEST_DIRS'] = f'{tmp_path}:{self.SELF_PATH.parent.parent}'
241 dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
242 actual = list(dirs.search(self.SELF_PATH.name))
243 assert actual == [self.SELF_PATH, self.SELF_PATH, self.SELF_PATH]
245 def test_search_default_home(self, dir_spec, env, tmp_path):
246 expected = tmp_path / dir_spec.xdg_home_default / 'default_home'
247 expected.parent.mkdir()
249 dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
250 actual = list(dirs.search(expected.name))
251 assert actual == [expected]
253 def test_search_default_dirs(self, dir_spec, env, tmp_path):
254 _, _, default_dir = dir_spec.xdg_dirs_default.rpartition(':')
255 expected = Path(default_dir, 'default_dirs')
256 expected.parent.mkdir()
258 dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
259 actual = list(dirs.search(expected.name))
260 assert actual == [expected]
262 def test_search_no_default_dirs(self, dir_spec, env, tmp_path):
263 dir_spec.xdg_dirs_key = None
264 dir_spec.xdg_dirs_default = None
265 for subdir in ['.test1', '.test2', dir_spec.xdg_home_default]:
266 expected = tmp_path / subdir / 'no_dirs'
267 expected.parent.mkdir()
269 dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
270 actual = list(dirs.search(expected.name))
271 assert actual == [expected]
273 def test_ignore_relative_directories(self, dir_spec, env, tmp_path):
274 test_path = Path(*self.SELF_PATH.parts[-2:])
275 assert test_path.exists(), "test setup problem: need an existing file in a subdirectory of ."
276 parent_path = str(test_path.parent)
277 env['TEST_DIRECTORY'] = '.'
278 env['XDG_TEST_HOME'] = parent_path
279 env['XDG_TEST_DIRS'] = parent_path
280 dirs = arvados.util._BaseDirectories(dir_spec, env, parent_path)
281 assert not list(dirs.search(test_path.name))
283 def test_search_warns_nondefault_home(self, dir_spec, env, tmp_path, caplog):
284 search_path = tmp_path / dir_spec.xdg_home_default / 'Search' / 'SearchConfig'
285 search_path.parent.mkdir(parents=True)
287 env[dir_spec.xdg_home_key] = str(tmp_path / '.nonexistent')
288 dirs = arvados.util._BaseDirectories(dir_spec, env, search_path.parent.name)
289 results = list(dirs.search(search_path.name))
290 expect_msg = "{} was not found under your configured ${} ({}), but does exist at the default location ({})".format(
291 Path(*search_path.parts[-2:]),
292 dir_spec.xdg_home_key,
293 env[dir_spec.xdg_home_key],
294 Path(*search_path.parts[:-2]),
296 assert caplog.messages
297 assert any(msg.startswith(expect_msg) for msg in caplog.messages)
300 def test_storage_path_systemd(self, dir_spec, env, tmp_path):
301 expected = tmp_path / 'rwsystemd'
302 expected.mkdir(0o700)
303 env['TEST_DIRECTORY'] = str(expected)
304 dirs = arvados.util._BaseDirectories(dir_spec, env)
305 assert dirs.storage_path() == expected
307 def test_storage_path_systemd_mixed_modes(self, dir_spec, env, tmp_path):
308 rodir = tmp_path / 'rodir'
310 expected = tmp_path / 'rwdir'
311 expected.mkdir(0o700)
312 env['TEST_DIRECTORY'] = f'{rodir}:{expected}'
313 dirs = arvados.util._BaseDirectories(dir_spec, env)
314 assert dirs.storage_path() == expected
316 def test_storage_path_xdg_home(self, dir_spec, env, tmp_path):
317 expected = tmp_path / '.xdghome' / 'arvados'
318 env['XDG_TEST_HOME'] = str(expected.parent)
319 dirs = arvados.util._BaseDirectories(dir_spec, env)
320 assert dirs.storage_path() == expected
321 exp_mode = stat.S_IFDIR | stat.S_IWUSR
322 assert (expected.stat().st_mode & exp_mode) == exp_mode
324 def test_storage_path_default(self, dir_spec, env, tmp_path):
325 expected = tmp_path / dir_spec.xdg_home_default / 'arvados'
326 dirs = arvados.util._BaseDirectories(dir_spec, env)
327 assert dirs.storage_path() == expected
328 exp_mode = stat.S_IFDIR | stat.S_IWUSR
329 assert (expected.stat().st_mode & exp_mode) == exp_mode
331 @pytest.mark.parametrize('subdir,mode', [
333 (Path('sub', 'path'), 0o770),
335 def test_storage_path_subdir(self, dir_spec, env, umask, tmp_path, subdir, mode):
336 expected = tmp_path / dir_spec.xdg_home_default / 'arvados' / subdir
337 dirs = arvados.util._BaseDirectories(dir_spec, env)
338 actual = dirs.storage_path(subdir, mode)
339 assert actual == expected
340 expect_mode = mode | stat.S_IFDIR
341 actual_mode = actual.stat().st_mode
342 assert (actual_mode & expect_mode) == expect_mode
343 assert not (actual_mode & stat.S_IRWXO)
345 def test_empty_xdg_home(self, dir_spec, env, tmp_path):
346 env['XDG_TEST_HOME'] = ''
347 expected = tmp_path / dir_spec.xdg_home_default / 'emptyhome'
348 dirs = arvados.util._BaseDirectories(dir_spec, env, expected.name)
349 assert dirs.storage_path() == expected
351 def test_empty_xdg_dirs(self, dir_spec, env, tmp_path):
352 env['XDG_TEST_DIRS'] = ''
353 _, _, default_dir = dir_spec.xdg_dirs_default.rpartition(':')
354 expected = Path(default_dir, 'empty_dirs')
355 expected.parent.mkdir()
357 dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
358 actual = list(dirs.search(expected.name))
359 assert actual == [expected]
361 def test_spec_key_lookup(self):
362 dirs = arvados.util._BaseDirectories('CACHE')
363 assert dirs._spec.systemd_key == 'CACHE_DIRECTORY'
364 assert dirs._spec.xdg_dirs_key is None
366 def test_spec_enum_lookup(self):
367 dirs = arvados.util._BaseDirectories(arvados.util._BaseDirectorySpecs.CONFIG)
368 assert dirs._spec.systemd_key == 'CONFIGURATION_DIRECTORY'