1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
13 from pathlib import Path
14 from unittest import mock
19 class MkdirDashPTest(unittest.TestCase):
22 os.path.mkdir('./tmp')
27 os.unlink('./tmp/bar')
33 arvados.util.mkdir_dash_p('./tmp/foo')
34 with open('./tmp/bar', 'wb') as f:
36 self.assertRaises(OSError, arvados.util.mkdir_dash_p, './tmp/bar')
39 class RunCommandTestCase(unittest.TestCase):
40 def test_success(self):
41 stdout, stderr = arvados.util.run_command(['echo', 'test'],
42 stderr=subprocess.PIPE)
43 self.assertEqual("test\n".encode(), stdout)
44 self.assertEqual("".encode(), stderr)
46 def test_failure(self):
47 with self.assertRaises(arvados.errors.CommandFailedError):
48 arvados.util.run_command(['false'])
50 class KeysetTestHelper:
51 def __init__(self, expect):
55 def fn(self, **kwargs):
56 if self.expect[self.n][0] != kwargs:
57 raise Exception("Didn't match %s != %s" % (self.expect[self.n][0], kwargs))
60 def execute(self, num_retries):
62 return self.expect[self.n-1][1]
65 'uuid': 'zzzzz-zyyyz-zzzzzyyyyywwwww',
66 'name': 'KeysetListAllTestCase.test_select mock',
67 'created_at': '2023-08-28T12:34:56.123456Z',
70 class KeysetListAllTestCase(unittest.TestCase):
72 ks = KeysetTestHelper([[
73 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
77 ls = list(arvados.util.keyset_list_all(ks.fn))
78 self.assertEqual(ls, [])
80 def test_oneitem(self):
81 ks = KeysetTestHelper([[
82 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
83 {"items": [{"created_at": "1", "uuid": "1"}]}
85 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", "=", "1"], ["uuid", ">", "1"]]},
88 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">", "1"]]},
92 ls = list(arvados.util.keyset_list_all(ks.fn))
93 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}])
95 def test_onepage2(self):
96 ks = KeysetTestHelper([[
97 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
98 {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
100 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"]]},
104 ls = list(arvados.util.keyset_list_all(ks.fn))
105 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}])
107 def test_onepage3(self):
108 ks = KeysetTestHelper([[
109 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
110 {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}]}
112 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "3"], ["uuid", "!=", "3"]]},
116 ls = list(arvados.util.keyset_list_all(ks.fn))
117 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}])
120 def test_twopage(self):
121 ks = KeysetTestHelper([[
122 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
123 {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
125 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"]]},
126 {"items": [{"created_at": "3", "uuid": "3"}, {"created_at": "4", "uuid": "4"}]}
128 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "4"], ["uuid", "!=", "4"]]},
132 ls = list(arvados.util.keyset_list_all(ks.fn))
133 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"},
134 {"created_at": "2", "uuid": "2"},
135 {"created_at": "3", "uuid": "3"},
136 {"created_at": "4", "uuid": "4"}
139 def test_repeated_key(self):
140 ks = KeysetTestHelper([[
141 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
142 {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "3"}]}
144 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "3"]]},
145 {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "4"}]}
147 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", "=", "2"], ["uuid", ">", "4"]]},
150 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">", "2"]]},
151 {"items": [{"created_at": "3", "uuid": "5"}, {"created_at": "4", "uuid": "6"}]}
153 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "4"], ["uuid", "!=", "6"]]},
158 ls = list(arvados.util.keyset_list_all(ks.fn))
159 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"},
160 {"created_at": "2", "uuid": "2"},
161 {"created_at": "2", "uuid": "3"},
162 {"created_at": "2", "uuid": "4"},
163 {"created_at": "3", "uuid": "5"},
164 {"created_at": "4", "uuid": "6"}
167 def test_onepage_withfilter(self):
168 ks = KeysetTestHelper([[
169 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["foo", ">", "bar"]]},
170 {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
172 {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"], ["foo", ">", "bar"]]},
176 ls = list(arvados.util.keyset_list_all(ks.fn, filters=[["foo", ">", "bar"]]))
177 self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}])
179 def test_onepage_desc(self):
180 ks = KeysetTestHelper([[
181 {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": []},
182 {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}]}
184 {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": [["created_at", "<=", "1"], ["uuid", "!=", "1"]]},
188 ls = list(arvados.util.keyset_list_all(ks.fn, ascending=False))
189 self.assertEqual(ls, [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}])
191 @parameterized.parameterized.expand(zip(
192 itertools.cycle(_SELECT_FAKE_ITEM),
193 itertools.chain.from_iterable(
194 itertools.combinations(_SELECT_FAKE_ITEM, count)
195 for count in range(len(_SELECT_FAKE_ITEM) + 1)
198 def test_select(self, order_key, select):
199 # keyset_list_all must have both uuid and order_key to function.
200 # Test that it selects those fields along with user-specified ones.
201 expect_select = {'uuid', order_key, *select}
204 for key, value in _SELECT_FAKE_ITEM.items()
205 if key in expect_select
207 list_func = mock.Mock()
208 list_func().execute = mock.Mock(
215 list_func.reset_mock()
216 actual = list(arvados.util.keyset_list_all(list_func, order_key, select=list(select)))
217 self.assertEqual(actual, [item])
218 calls = list_func.call_args_list
219 self.assertTrue(len(calls) >= 2, "list_func() not called enough to exhaust items")
220 for args, kwargs in calls:
221 self.assertEqual(set(kwargs.get('select', ())), expect_select)
224 class TestBaseDirectories:
225 SELF_PATH = Path(__file__)
228 def dir_spec(self, tmp_path):
229 return arvados.util._BaseDirectorySpec(
234 f"{tmp_path / '.test1'}:{tmp_path / '.test2'}",
238 def env(self, tmp_path):
239 return {'HOME': str(tmp_path)}
241 def test_search_systemd_dirs(self, dir_spec, env, tmp_path):
242 env['TEST_DIRECTORY'] = f'{tmp_path}:{self.SELF_PATH.parent}'
243 dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
244 actual = list(dirs.search(self.SELF_PATH.name))
245 assert actual == [self.SELF_PATH]
247 def test_search_xdg_home(self, dir_spec, env, tmp_path):
248 env['XDG_TEST_HOME'] = str(self.SELF_PATH.parent.parent)
249 dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
250 actual = list(dirs.search(self.SELF_PATH.name))
251 assert actual == [self.SELF_PATH]
253 def test_search_xdg_dirs(self, dir_spec, env, tmp_path):
254 env['XDG_TEST_DIRS'] = f'{tmp_path}:{self.SELF_PATH.parent.parent}'
255 dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
256 actual = list(dirs.search(self.SELF_PATH.name))
257 assert actual == [self.SELF_PATH]
259 def test_search_all_dirs(self, dir_spec, env, tmp_path):
260 env['TEST_DIRECTORY'] = f'{tmp_path}:{self.SELF_PATH.parent}'
261 env['XDG_TEST_HOME'] = str(self.SELF_PATH.parent.parent)
262 env['XDG_TEST_DIRS'] = f'{tmp_path}:{self.SELF_PATH.parent.parent}'
263 dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
264 actual = list(dirs.search(self.SELF_PATH.name))
265 assert actual == [self.SELF_PATH, self.SELF_PATH, self.SELF_PATH]
267 def test_search_default_home(self, dir_spec, env, tmp_path):
268 expected = tmp_path / dir_spec.xdg_home_default / 'default_home'
269 expected.parent.mkdir()
271 dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
272 actual = list(dirs.search(expected.name))
273 assert actual == [expected]
275 def test_search_default_dirs(self, dir_spec, env, tmp_path):
276 _, _, default_dir = dir_spec.xdg_dirs_default.rpartition(':')
277 expected = Path(default_dir, 'default_dirs')
278 expected.parent.mkdir()
280 dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
281 actual = list(dirs.search(expected.name))
282 assert actual == [expected]
284 def test_search_no_default_dirs(self, dir_spec, env, tmp_path):
285 dir_spec.xdg_dirs_key = None
286 dir_spec.xdg_dirs_default = None
287 for subdir in ['.test1', '.test2', dir_spec.xdg_home_default]:
288 expected = tmp_path / subdir / 'no_dirs'
289 expected.parent.mkdir()
291 dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
292 actual = list(dirs.search(expected.name))
293 assert actual == [expected]
295 def test_ignore_relative_directories(self, dir_spec, env, tmp_path):
296 test_path = Path(*self.SELF_PATH.parts[-2:])
297 assert test_path.exists(), "test setup problem: need an existing file in a subdirectory of ."
298 parent_path = str(test_path.parent)
299 env['TEST_DIRECTORY'] = '.'
300 env['XDG_TEST_HOME'] = parent_path
301 env['XDG_TEST_DIRS'] = parent_path
302 dirs = arvados.util._BaseDirectories(dir_spec, env, parent_path)
303 assert not list(dirs.search(test_path.name))
305 def test_storage_path_systemd(self, dir_spec, env, tmp_path):
306 expected = tmp_path / 'rwsystemd'
307 expected.mkdir(0o700)
308 env['TEST_DIRECTORY'] = str(expected)
309 dirs = arvados.util._BaseDirectories(dir_spec, env)
310 assert dirs.storage_path() == expected
312 def test_storage_path_systemd_mixed_modes(self, dir_spec, env, tmp_path):
313 rodir = tmp_path / 'rodir'
315 expected = tmp_path / 'rwdir'
316 expected.mkdir(0o700)
317 env['TEST_DIRECTORY'] = f'{rodir}:{expected}'
318 dirs = arvados.util._BaseDirectories(dir_spec, env)
319 assert dirs.storage_path() == expected
321 def test_storage_path_xdg_home(self, dir_spec, env, tmp_path):
322 expected = tmp_path / '.xdghome' / 'arvados'
323 env['XDG_TEST_HOME'] = str(expected.parent)
324 dirs = arvados.util._BaseDirectories(dir_spec, env)
325 assert dirs.storage_path() == expected
326 exp_mode = stat.S_IFDIR | stat.S_IWUSR
327 assert (expected.stat().st_mode & exp_mode) == exp_mode
329 def test_storage_path_default(self, dir_spec, env, tmp_path):
330 expected = tmp_path / dir_spec.xdg_home_default / 'arvados'
331 dirs = arvados.util._BaseDirectories(dir_spec, env)
332 assert dirs.storage_path() == expected
333 exp_mode = stat.S_IFDIR | stat.S_IWUSR
334 assert (expected.stat().st_mode & exp_mode) == exp_mode
336 def test_empty_xdg_home(self, dir_spec, env, tmp_path):
337 env['XDG_TEST_HOME'] = ''
338 expected = tmp_path / dir_spec.xdg_home_default / 'emptyhome'
339 dirs = arvados.util._BaseDirectories(dir_spec, env, expected.name)
340 assert dirs.storage_path() == expected
342 def test_empty_xdg_dirs(self, dir_spec, env, tmp_path):
343 env['XDG_TEST_DIRS'] = ''
344 _, _, default_dir = dir_spec.xdg_dirs_default.rpartition(':')
345 expected = Path(default_dir, 'empty_dirs')
346 expected.parent.mkdir()
348 dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
349 actual = list(dirs.search(expected.name))
350 assert actual == [expected]
352 def test_spec_key_lookup(self):
353 dirs = arvados.util._BaseDirectories('CACHE')
354 assert dirs._spec.systemd_key == 'CACHE_DIRECTORY'
355 assert dirs._spec.xdg_dirs_key is None
357 def test_spec_enum_lookup(self):
358 dirs = arvados.util._BaseDirectories(arvados.util._BaseDirectorySpecs.CONFIG)
359 assert dirs._spec.systemd_key == 'CONFIGURATION_DIRECTORY'