21388: Update deb instructions in Python READMEs
[arvados.git] / sdk / python / tests / test_util.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import itertools
6 import os
7 import stat
8 import subprocess
9 import unittest
10
11 import parameterized
12 import pytest
13 from pathlib import Path
14 from unittest import mock
15
16 import arvados
17 import arvados.util
18
19 class MkdirDashPTest(unittest.TestCase):
20     def setUp(self):
21         try:
22             os.path.mkdir('./tmp')
23         except:
24             pass
25     def tearDown(self):
26         try:
27             os.unlink('./tmp/bar')
28             os.rmdir('./tmp/foo')
29             os.rmdir('./tmp')
30         except:
31             pass
32     def runTest(self):
33         arvados.util.mkdir_dash_p('./tmp/foo')
34         with open('./tmp/bar', 'wb') as f:
35             f.write(b'bar')
36         self.assertRaises(OSError, arvados.util.mkdir_dash_p, './tmp/bar')
37
38
39 class RunCommandTestCase(unittest.TestCase):
40     def test_success(self):
41         stdout, stderr = arvados.util.run_command(['echo', 'test'],
42                                                   stderr=subprocess.PIPE)
43         self.assertEqual("test\n".encode(), stdout)
44         self.assertEqual("".encode(), stderr)
45
46     def test_failure(self):
47         with self.assertRaises(arvados.errors.CommandFailedError):
48             arvados.util.run_command(['false'])
49
50 class KeysetTestHelper:
51     def __init__(self, expect):
52         self.n = 0
53         self.expect = expect
54
55     def fn(self, **kwargs):
56         if self.expect[self.n][0] != kwargs:
57             raise Exception("Didn't match %s != %s" % (self.expect[self.n][0], kwargs))
58         return self
59
60     def execute(self, num_retries):
61         self.n += 1
62         return self.expect[self.n-1][1]
63
64 _SELECT_FAKE_ITEM = {
65     'uuid': 'zzzzz-zyyyz-zzzzzyyyyywwwww',
66     'name': 'KeysetListAllTestCase.test_select mock',
67     'created_at': '2023-08-28T12:34:56.123456Z',
68 }
69
70 class KeysetListAllTestCase(unittest.TestCase):
71     def test_empty(self):
72         ks = KeysetTestHelper([[
73             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
74             {"items": []}
75         ]])
76
77         ls = list(arvados.util.keyset_list_all(ks.fn))
78         self.assertEqual(ls, [])
79
80     def test_oneitem(self):
81         ks = KeysetTestHelper([[
82             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
83             {"items": [{"created_at": "1", "uuid": "1"}]}
84         ], [
85             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", "=", "1"], ["uuid", ">", "1"]]},
86             {"items": []}
87         ],[
88             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">", "1"]]},
89             {"items": []}
90         ]])
91
92         ls = list(arvados.util.keyset_list_all(ks.fn))
93         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}])
94
95     def test_onepage2(self):
96         ks = KeysetTestHelper([[
97             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
98             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
99         ], [
100             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"]]},
101             {"items": []}
102         ]])
103
104         ls = list(arvados.util.keyset_list_all(ks.fn))
105         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}])
106
107     def test_onepage3(self):
108         ks = KeysetTestHelper([[
109             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
110             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}]}
111         ], [
112             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "3"], ["uuid", "!=", "3"]]},
113             {"items": []}
114         ]])
115
116         ls = list(arvados.util.keyset_list_all(ks.fn))
117         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}])
118
119
120     def test_twopage(self):
121         ks = KeysetTestHelper([[
122             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
123             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
124         ], [
125             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"]]},
126             {"items": [{"created_at": "3", "uuid": "3"}, {"created_at": "4", "uuid": "4"}]}
127         ], [
128             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "4"], ["uuid", "!=", "4"]]},
129             {"items": []}
130         ]])
131
132         ls = list(arvados.util.keyset_list_all(ks.fn))
133         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"},
134                               {"created_at": "2", "uuid": "2"},
135                               {"created_at": "3", "uuid": "3"},
136                               {"created_at": "4", "uuid": "4"}
137         ])
138
139     def test_repeated_key(self):
140         ks = KeysetTestHelper([[
141             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
142             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "3"}]}
143         ], [
144             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "3"]]},
145             {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "4"}]}
146         ], [
147             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", "=", "2"], ["uuid", ">", "4"]]},
148             {"items": []}
149         ], [
150             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">", "2"]]},
151             {"items": [{"created_at": "3", "uuid": "5"}, {"created_at": "4", "uuid": "6"}]}
152         ], [
153             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "4"], ["uuid", "!=", "6"]]},
154             {"items": []}
155         ],
156         ])
157
158         ls = list(arvados.util.keyset_list_all(ks.fn))
159         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"},
160                               {"created_at": "2", "uuid": "2"},
161                               {"created_at": "2", "uuid": "3"},
162                               {"created_at": "2", "uuid": "4"},
163                               {"created_at": "3", "uuid": "5"},
164                               {"created_at": "4", "uuid": "6"}
165         ])
166
167     def test_onepage_withfilter(self):
168         ks = KeysetTestHelper([[
169             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["foo", ">", "bar"]]},
170             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
171         ], [
172             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"], ["foo", ">", "bar"]]},
173             {"items": []}
174         ]])
175
176         ls = list(arvados.util.keyset_list_all(ks.fn, filters=[["foo", ">", "bar"]]))
177         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}])
178
179     def test_onepage_desc(self):
180         ks = KeysetTestHelper([[
181             {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": []},
182             {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}]}
183         ], [
184             {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": [["created_at", "<=", "1"], ["uuid", "!=", "1"]]},
185             {"items": []}
186         ]])
187
188         ls = list(arvados.util.keyset_list_all(ks.fn, ascending=False))
189         self.assertEqual(ls, [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}])
190
191     @parameterized.parameterized.expand(zip(
192         itertools.cycle(_SELECT_FAKE_ITEM),
193         itertools.chain.from_iterable(
194             itertools.combinations(_SELECT_FAKE_ITEM, count)
195             for count in range(len(_SELECT_FAKE_ITEM) + 1)
196         ),
197     ))
198     def test_select(self, order_key, select):
199         # keyset_list_all must have both uuid and order_key to function.
200         # Test that it selects those fields along with user-specified ones.
201         expect_select = {'uuid', order_key, *select}
202         item = {
203             key: value
204             for key, value in _SELECT_FAKE_ITEM.items()
205             if key in expect_select
206         }
207         list_func = mock.Mock()
208         list_func().execute = mock.Mock(
209             side_effect=[
210                 {'items': [item]},
211                 {'items': []},
212                 {'items': []},
213             ],
214         )
215         list_func.reset_mock()
216         actual = list(arvados.util.keyset_list_all(list_func, order_key, select=list(select)))
217         self.assertEqual(actual, [item])
218         calls = list_func.call_args_list
219         self.assertTrue(len(calls) >= 2, "list_func() not called enough to exhaust items")
220         for args, kwargs in calls:
221             self.assertEqual(set(kwargs.get('select', ())), expect_select)
222
223
224 class TestBaseDirectories:
225     SELF_PATH = Path(__file__)
226
227     @pytest.fixture
228     def dir_spec(self, tmp_path):
229         return arvados.util._BaseDirectorySpec(
230             'TEST_DIRECTORY',
231             'XDG_TEST_HOME',
232             Path('.test'),
233             'XDG_TEST_DIRS',
234             f"{tmp_path / '.test1'}:{tmp_path / '.test2'}",
235         )
236
237     @pytest.fixture
238     def env(self, tmp_path):
239         return {'HOME': str(tmp_path)}
240
241     @pytest.fixture
242     def umask(self):
243         orig_umask = os.umask(0o002)
244         try:
245             yield
246         finally:
247             os.umask(orig_umask)
248
249     def test_search_systemd_dirs(self, dir_spec, env, tmp_path):
250         env['TEST_DIRECTORY'] = f'{tmp_path}:{self.SELF_PATH.parent}'
251         dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
252         actual = list(dirs.search(self.SELF_PATH.name))
253         assert actual == [self.SELF_PATH]
254
255     def test_search_xdg_home(self, dir_spec, env, tmp_path):
256         env['XDG_TEST_HOME'] = str(self.SELF_PATH.parent.parent)
257         dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
258         actual = list(dirs.search(self.SELF_PATH.name))
259         assert actual == [self.SELF_PATH]
260
261     def test_search_xdg_dirs(self, dir_spec, env, tmp_path):
262         env['XDG_TEST_DIRS'] = f'{tmp_path}:{self.SELF_PATH.parent.parent}'
263         dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
264         actual = list(dirs.search(self.SELF_PATH.name))
265         assert actual == [self.SELF_PATH]
266
267     def test_search_all_dirs(self, dir_spec, env, tmp_path):
268         env['TEST_DIRECTORY'] = f'{tmp_path}:{self.SELF_PATH.parent}'
269         env['XDG_TEST_HOME'] = str(self.SELF_PATH.parent.parent)
270         env['XDG_TEST_DIRS'] = f'{tmp_path}:{self.SELF_PATH.parent.parent}'
271         dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
272         actual = list(dirs.search(self.SELF_PATH.name))
273         assert actual == [self.SELF_PATH, self.SELF_PATH, self.SELF_PATH]
274
275     def test_search_default_home(self, dir_spec, env, tmp_path):
276         expected = tmp_path / dir_spec.xdg_home_default / 'default_home'
277         expected.parent.mkdir()
278         expected.touch()
279         dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
280         actual = list(dirs.search(expected.name))
281         assert actual == [expected]
282
283     def test_search_default_dirs(self, dir_spec, env, tmp_path):
284         _, _, default_dir = dir_spec.xdg_dirs_default.rpartition(':')
285         expected = Path(default_dir, 'default_dirs')
286         expected.parent.mkdir()
287         expected.touch()
288         dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
289         actual = list(dirs.search(expected.name))
290         assert actual == [expected]
291
292     def test_search_no_default_dirs(self, dir_spec, env, tmp_path):
293         dir_spec.xdg_dirs_key = None
294         dir_spec.xdg_dirs_default = None
295         for subdir in ['.test1', '.test2', dir_spec.xdg_home_default]:
296             expected = tmp_path / subdir / 'no_dirs'
297             expected.parent.mkdir()
298             expected.touch()
299         dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
300         actual = list(dirs.search(expected.name))
301         assert actual == [expected]
302
303     def test_ignore_relative_directories(self, dir_spec, env, tmp_path):
304         test_path = Path(*self.SELF_PATH.parts[-2:])
305         assert test_path.exists(), "test setup problem: need an existing file in a subdirectory of ."
306         parent_path = str(test_path.parent)
307         env['TEST_DIRECTORY'] = '.'
308         env['XDG_TEST_HOME'] = parent_path
309         env['XDG_TEST_DIRS'] = parent_path
310         dirs = arvados.util._BaseDirectories(dir_spec, env, parent_path)
311         assert not list(dirs.search(test_path.name))
312
313     def test_search_warns_nondefault_home(self, dir_spec, env, tmp_path, caplog):
314         search_path = tmp_path / dir_spec.xdg_home_default / 'Search' / 'SearchConfig'
315         search_path.parent.mkdir(parents=True)
316         search_path.touch()
317         env[dir_spec.xdg_home_key] = str(tmp_path / '.nonexistent')
318         dirs = arvados.util._BaseDirectories(dir_spec, env, search_path.parent.name)
319         results = list(dirs.search(search_path.name))
320         expect_msg = "{} was not found under your configured ${} ({}), but does exist at the default location ({})".format(
321             Path(*search_path.parts[-2:]),
322             dir_spec.xdg_home_key,
323             env[dir_spec.xdg_home_key],
324             Path(*search_path.parts[:-2]),
325         )
326         assert caplog.messages
327         assert any(msg.startswith(expect_msg) for msg in caplog.messages)
328         assert not results
329
330     def test_storage_path_systemd(self, dir_spec, env, tmp_path):
331         expected = tmp_path / 'rwsystemd'
332         expected.mkdir(0o700)
333         env['TEST_DIRECTORY'] = str(expected)
334         dirs = arvados.util._BaseDirectories(dir_spec, env)
335         assert dirs.storage_path() == expected
336
337     def test_storage_path_systemd_mixed_modes(self, dir_spec, env, tmp_path):
338         rodir = tmp_path / 'rodir'
339         rodir.mkdir(0o500)
340         expected = tmp_path / 'rwdir'
341         expected.mkdir(0o700)
342         env['TEST_DIRECTORY'] = f'{rodir}:{expected}'
343         dirs = arvados.util._BaseDirectories(dir_spec, env)
344         assert dirs.storage_path() == expected
345
346     def test_storage_path_xdg_home(self, dir_spec, env, tmp_path):
347         expected = tmp_path / '.xdghome' / 'arvados'
348         env['XDG_TEST_HOME'] = str(expected.parent)
349         dirs = arvados.util._BaseDirectories(dir_spec, env)
350         assert dirs.storage_path() == expected
351         exp_mode = stat.S_IFDIR | stat.S_IWUSR
352         assert (expected.stat().st_mode & exp_mode) == exp_mode
353
354     def test_storage_path_default(self, dir_spec, env, tmp_path):
355         expected = tmp_path / dir_spec.xdg_home_default / 'arvados'
356         dirs = arvados.util._BaseDirectories(dir_spec, env)
357         assert dirs.storage_path() == expected
358         exp_mode = stat.S_IFDIR | stat.S_IWUSR
359         assert (expected.stat().st_mode & exp_mode) == exp_mode
360
361     @pytest.mark.parametrize('subdir,mode', [
362         ('str/dir', 0o750),
363         (Path('sub', 'path'), 0o770),
364     ])
365     def test_storage_path_subdir(self, dir_spec, env, umask, tmp_path, subdir, mode):
366         expected = tmp_path / dir_spec.xdg_home_default / 'arvados' / subdir
367         dirs = arvados.util._BaseDirectories(dir_spec, env)
368         actual = dirs.storage_path(subdir, mode)
369         assert actual == expected
370         expect_mode = mode | stat.S_IFDIR
371         actual_mode = actual.stat().st_mode
372         assert (actual_mode & expect_mode) == expect_mode
373         assert not (actual_mode & stat.S_IRWXO)
374
375     def test_empty_xdg_home(self, dir_spec, env, tmp_path):
376         env['XDG_TEST_HOME'] = ''
377         expected = tmp_path / dir_spec.xdg_home_default / 'emptyhome'
378         dirs = arvados.util._BaseDirectories(dir_spec, env, expected.name)
379         assert dirs.storage_path() == expected
380
381     def test_empty_xdg_dirs(self, dir_spec, env, tmp_path):
382         env['XDG_TEST_DIRS'] = ''
383         _, _, default_dir = dir_spec.xdg_dirs_default.rpartition(':')
384         expected = Path(default_dir, 'empty_dirs')
385         expected.parent.mkdir()
386         expected.touch()
387         dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
388         actual = list(dirs.search(expected.name))
389         assert actual == [expected]
390
391     def test_spec_key_lookup(self):
392         dirs = arvados.util._BaseDirectories('CACHE')
393         assert dirs._spec.systemd_key == 'CACHE_DIRECTORY'
394         assert dirs._spec.xdg_dirs_key is None
395
396     def test_spec_enum_lookup(self):
397         dirs = arvados.util._BaseDirectories(arvados.util._BaseDirectorySpecs.CONFIG)
398         assert dirs._spec.systemd_key == 'CONFIGURATION_DIRECTORY'