1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
13 from pathlib import Path
14 from unittest import mock
19 class MkdirDashPTest(unittest.TestCase):
22 os.path.mkdir('./tmp')
27 os.unlink('./tmp/bar')
33 arvados.util.mkdir_dash_p('./tmp/foo')
34 with open('./tmp/bar', 'wb') as f:
36 self.assertRaises(OSError, arvados.util.mkdir_dash_p, './tmp/bar')
39 class RunCommandTestCase(unittest.TestCase):
40 def test_success(self):
41 stdout, stderr = arvados.util.run_command(['echo', 'test'],
42 stderr=subprocess.PIPE)
43 self.assertEqual("test\n".encode(), stdout)
44 self.assertEqual("".encode(), stderr)
46 def test_failure(self):
47 with self.assertRaises(arvados.errors.CommandFailedError):
48 arvados.util.run_command(['false'])
50 class KeysetTestHelper:
51 def __init__(self, expect):
55 def fn(self, **kwargs):
56 if self.expect[self.n][0] != kwargs:
57 raise Exception("Didn't match %s != %s" % (self.expect[self.n][0], kwargs))
60 def execute(self, num_retries):
62 return self.expect[self.n-1][1]
65 'uuid': 'zzzzz-zyyyz-zzzzzyyyyywwwww',
66 'name': 'KeysetListAllTestCase.test_select mock',
67 'created_at': '2023-08-28T12:34:56.123456Z',
70 class KeysetListAllTestCase(unittest.TestCase):
72 ks = KeysetTestHelper([[
73 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
77 ls = list(arvados.util.keyset_list_all(ks.fn))
78 self.assertEqual(ls, [])
80 def test_oneitem(self):
81 ks = KeysetTestHelper([[
82 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
83 {"items": [{"created_at": "1", "uuid": "1"}]}
85 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", "=", "1"], ["uuid", ">", "1"]]},
88 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">", "1"]]},
92 ls = list(arvados.util.keyset_list_all(ks.fn))
93 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}])
95 def test_onepage2(self):
96 ks = KeysetTestHelper([[
97 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
98 {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
100 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"]]},
104 ls = list(arvados.util.keyset_list_all(ks.fn))
105 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}])
107 def test_onepage3(self):
108 ks = KeysetTestHelper([[
109 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
110 {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}]}
112 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "3"], ["uuid", "!=", "3"]]},
116 ls = list(arvados.util.keyset_list_all(ks.fn))
117 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}])
120 def test_twopage(self):
121 ks = KeysetTestHelper([[
122 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
123 {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
125 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"]]},
126 {"items": [{"created_at": "3", "uuid": "3"}, {"created_at": "4", "uuid": "4"}]}
128 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "4"], ["uuid", "!=", "4"]]},
132 ls = list(arvados.util.keyset_list_all(ks.fn))
133 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"},
134 {"created_at": "2", "uuid": "2"},
135 {"created_at": "3", "uuid": "3"},
136 {"created_at": "4", "uuid": "4"}
139 def test_repeated_key(self):
140 ks = KeysetTestHelper([[
141 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
142 {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "3"}]}
144 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "3"]]},
145 {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "4"}]}
147 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", "=", "2"], ["uuid", ">", "4"]]},
150 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">", "2"]]},
151 {"items": [{"created_at": "3", "uuid": "5"}, {"created_at": "4", "uuid": "6"}]}
153 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "4"], ["uuid", "!=", "6"]]},
158 ls = list(arvados.util.keyset_list_all(ks.fn))
159 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"},
160 {"created_at": "2", "uuid": "2"},
161 {"created_at": "2", "uuid": "3"},
162 {"created_at": "2", "uuid": "4"},
163 {"created_at": "3", "uuid": "5"},
164 {"created_at": "4", "uuid": "6"}
167 def test_onepage_withfilter(self):
168 ks = KeysetTestHelper([[
169 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["foo", ">", "bar"]]},
170 {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
172 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"], ["foo", ">", "bar"]]},
176 ls = list(arvados.util.keyset_list_all(ks.fn, filters=[["foo", ">", "bar"]]))
177 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}])
179 def test_onepage_desc(self):
180 ks = KeysetTestHelper([[
181 {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": []},
182 {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}]}
184 {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": [["created_at", "<=", "1"], ["uuid", "!=", "1"]]},
188 ls = list(arvados.util.keyset_list_all(ks.fn, ascending=False))
189 self.assertEqual(ls, [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}])
191 @parameterized.parameterized.expand(zip(
192 itertools.cycle(_SELECT_FAKE_ITEM),
193 itertools.chain.from_iterable(
194 itertools.combinations(_SELECT_FAKE_ITEM, count)
195 for count in range(len(_SELECT_FAKE_ITEM) + 1)
198 def test_select(self, order_key, select):
199 # keyset_list_all must have both uuid and order_key to function.
200 # Test that it selects those fields along with user-specified ones.
201 expect_select = {'uuid', order_key, *select}
204 for key, value in _SELECT_FAKE_ITEM.items()
205 if key in expect_select
207 list_func = mock.Mock()
208 list_func().execute = mock.Mock(
215 list_func.reset_mock()
216 actual = list(arvados.util.keyset_list_all(list_func, order_key, select=list(select)))
217 self.assertEqual(actual, [item])
218 calls = list_func.call_args_list
219 self.assertTrue(len(calls) >= 2, "list_func() not called enough to exhaust items")
220 for args, kwargs in calls:
221 self.assertEqual(set(kwargs.get('select', ())), expect_select)
224 class TestBaseDirectories:
225 SELF_PATH = Path(__file__)
228 def dir_spec(self, tmp_path):
229 return arvados.util._BaseDirectorySpec(
234 f"{tmp_path / '.test1'}:{tmp_path / '.test2'}",
238 def env(self, tmp_path):
239 return {'HOME': str(tmp_path)}
243 orig_umask = os.umask(0o002)
249 def test_search_systemd_dirs(self, dir_spec, env, tmp_path):
250 env['TEST_DIRECTORY'] = f'{tmp_path}:{self.SELF_PATH.parent}'
251 dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
252 actual = list(dirs.search(self.SELF_PATH.name))
253 assert actual == [self.SELF_PATH]
255 def test_search_xdg_home(self, dir_spec, env, tmp_path):
256 env['XDG_TEST_HOME'] = str(self.SELF_PATH.parent.parent)
257 dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
258 actual = list(dirs.search(self.SELF_PATH.name))
259 assert actual == [self.SELF_PATH]
261 def test_search_xdg_dirs(self, dir_spec, env, tmp_path):
262 env['XDG_TEST_DIRS'] = f'{tmp_path}:{self.SELF_PATH.parent.parent}'
263 dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
264 actual = list(dirs.search(self.SELF_PATH.name))
265 assert actual == [self.SELF_PATH]
267 def test_search_all_dirs(self, dir_spec, env, tmp_path):
268 env['TEST_DIRECTORY'] = f'{tmp_path}:{self.SELF_PATH.parent}'
269 env['XDG_TEST_HOME'] = str(self.SELF_PATH.parent.parent)
270 env['XDG_TEST_DIRS'] = f'{tmp_path}:{self.SELF_PATH.parent.parent}'
271 dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
272 actual = list(dirs.search(self.SELF_PATH.name))
273 assert actual == [self.SELF_PATH, self.SELF_PATH, self.SELF_PATH]
275 def test_search_default_home(self, dir_spec, env, tmp_path):
276 expected = tmp_path / dir_spec.xdg_home_default / 'default_home'
277 expected.parent.mkdir()
279 dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
280 actual = list(dirs.search(expected.name))
281 assert actual == [expected]
283 def test_search_default_dirs(self, dir_spec, env, tmp_path):
284 _, _, default_dir = dir_spec.xdg_dirs_default.rpartition(':')
285 expected = Path(default_dir, 'default_dirs')
286 expected.parent.mkdir()
288 dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
289 actual = list(dirs.search(expected.name))
290 assert actual == [expected]
292 def test_search_no_default_dirs(self, dir_spec, env, tmp_path):
293 dir_spec.xdg_dirs_key = None
294 dir_spec.xdg_dirs_default = None
295 for subdir in ['.test1', '.test2', dir_spec.xdg_home_default]:
296 expected = tmp_path / subdir / 'no_dirs'
297 expected.parent.mkdir()
299 dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
300 actual = list(dirs.search(expected.name))
301 assert actual == [expected]
303 def test_ignore_relative_directories(self, dir_spec, env, tmp_path):
304 test_path = Path(*self.SELF_PATH.parts[-2:])
305 assert test_path.exists(), "test setup problem: need an existing file in a subdirectory of ."
306 parent_path = str(test_path.parent)
307 env['TEST_DIRECTORY'] = '.'
308 env['XDG_TEST_HOME'] = parent_path
309 env['XDG_TEST_DIRS'] = parent_path
310 dirs = arvados.util._BaseDirectories(dir_spec, env, parent_path)
311 assert not list(dirs.search(test_path.name))
313 def test_search_warns_nondefault_home(self, dir_spec, env, tmp_path, caplog):
314 search_path = tmp_path / dir_spec.xdg_home_default / 'Search' / 'SearchConfig'
315 search_path.parent.mkdir(parents=True)
317 env[dir_spec.xdg_home_key] = str(tmp_path / '.nonexistent')
318 dirs = arvados.util._BaseDirectories(dir_spec, env, search_path.parent.name)
319 results = list(dirs.search(search_path.name))
320 expect_msg = "{} was not found under your configured ${} ({}), but does exist at the default location ({})".format(
321 Path(*search_path.parts[-2:]),
322 dir_spec.xdg_home_key,
323 env[dir_spec.xdg_home_key],
324 Path(*search_path.parts[:-2]),
326 assert caplog.messages
327 assert any(msg.startswith(expect_msg) for msg in caplog.messages)
330 def test_storage_path_systemd(self, dir_spec, env, tmp_path):
331 expected = tmp_path / 'rwsystemd'
332 expected.mkdir(0o700)
333 env['TEST_DIRECTORY'] = str(expected)
334 dirs = arvados.util._BaseDirectories(dir_spec, env)
335 assert dirs.storage_path() == expected
337 def test_storage_path_systemd_mixed_modes(self, dir_spec, env, tmp_path):
338 rodir = tmp_path / 'rodir'
340 expected = tmp_path / 'rwdir'
341 expected.mkdir(0o700)
342 env['TEST_DIRECTORY'] = f'{rodir}:{expected}'
343 dirs = arvados.util._BaseDirectories(dir_spec, env)
344 assert dirs.storage_path() == expected
346 def test_storage_path_xdg_home(self, dir_spec, env, tmp_path):
347 expected = tmp_path / '.xdghome' / 'arvados'
348 env['XDG_TEST_HOME'] = str(expected.parent)
349 dirs = arvados.util._BaseDirectories(dir_spec, env)
350 assert dirs.storage_path() == expected
351 exp_mode = stat.S_IFDIR | stat.S_IWUSR
352 assert (expected.stat().st_mode & exp_mode) == exp_mode
354 def test_storage_path_default(self, dir_spec, env, tmp_path):
355 expected = tmp_path / dir_spec.xdg_home_default / 'arvados'
356 dirs = arvados.util._BaseDirectories(dir_spec, env)
357 assert dirs.storage_path() == expected
358 exp_mode = stat.S_IFDIR | stat.S_IWUSR
359 assert (expected.stat().st_mode & exp_mode) == exp_mode
361 @pytest.mark.parametrize('subdir,mode', [
363 (Path('sub', 'path'), 0o770),
365 def test_storage_path_subdir(self, dir_spec, env, umask, tmp_path, subdir, mode):
366 expected = tmp_path / dir_spec.xdg_home_default / 'arvados' / subdir
367 dirs = arvados.util._BaseDirectories(dir_spec, env)
368 actual = dirs.storage_path(subdir, mode)
369 assert actual == expected
370 expect_mode = mode | stat.S_IFDIR
371 actual_mode = actual.stat().st_mode
372 assert (actual_mode & expect_mode) == expect_mode
373 assert not (actual_mode & stat.S_IRWXO)
375 def test_empty_xdg_home(self, dir_spec, env, tmp_path):
376 env['XDG_TEST_HOME'] = ''
377 expected = tmp_path / dir_spec.xdg_home_default / 'emptyhome'
378 dirs = arvados.util._BaseDirectories(dir_spec, env, expected.name)
379 assert dirs.storage_path() == expected
381 def test_empty_xdg_dirs(self, dir_spec, env, tmp_path):
382 env['XDG_TEST_DIRS'] = ''
383 _, _, default_dir = dir_spec.xdg_dirs_default.rpartition(':')
384 expected = Path(default_dir, 'empty_dirs')
385 expected.parent.mkdir()
387 dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
388 actual = list(dirs.search(expected.name))
389 assert actual == [expected]
391 def test_spec_key_lookup(self):
392 dirs = arvados.util._BaseDirectories('CACHE')
393 assert dirs._spec.systemd_key == 'CACHE_DIRECTORY'
394 assert dirs._spec.xdg_dirs_key is None
396 def test_spec_enum_lookup(self):
397 dirs = arvados.util._BaseDirectories(arvados.util._BaseDirectorySpecs.CONFIG)
398 assert dirs._spec.systemd_key == 'CONFIGURATION_DIRECTORY'