]> git.arvados.org - arvados.git/blob - sdk/python/tests/test_util.py
21020: Make arvados.api.http_cache get a path from the environment
[arvados.git] / sdk / python / tests / test_util.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import itertools
6 import os
7 import stat
8 import subprocess
9 import unittest
10
11 import parameterized
12 import pytest
13 from pathlib import Path
14 from unittest import mock
15
16 import arvados
17 import arvados.util
18
19 class MkdirDashPTest(unittest.TestCase):
20     def setUp(self):
21         try:
22             os.path.mkdir('./tmp')
23         except:
24             pass
25     def tearDown(self):
26         try:
27             os.unlink('./tmp/bar')
28             os.rmdir('./tmp/foo')
29             os.rmdir('./tmp')
30         except:
31             pass
32     def runTest(self):
33         arvados.util.mkdir_dash_p('./tmp/foo')
34         with open('./tmp/bar', 'wb') as f:
35             f.write(b'bar')
36         self.assertRaises(OSError, arvados.util.mkdir_dash_p, './tmp/bar')
37
38
39 class RunCommandTestCase(unittest.TestCase):
40     def test_success(self):
41         stdout, stderr = arvados.util.run_command(['echo', 'test'],
42                                                   stderr=subprocess.PIPE)
43         self.assertEqual("test\n".encode(), stdout)
44         self.assertEqual("".encode(), stderr)
45
46     def test_failure(self):
47         with self.assertRaises(arvados.errors.CommandFailedError):
48             arvados.util.run_command(['false'])
49
50 class KeysetTestHelper:
51     def __init__(self, expect):
52         self.n = 0
53         self.expect = expect
54
55     def fn(self, **kwargs):
56         if self.expect[self.n][0] != kwargs:
57             raise Exception("Didn't match %s != %s" % (self.expect[self.n][0], kwargs))
58         return self
59
60     def execute(self, num_retries):
61         self.n += 1
62         return self.expect[self.n-1][1]
63
64 _SELECT_FAKE_ITEM = {
65     'uuid': 'zzzzz-zyyyz-zzzzzyyyyywwwww',
66     'name': 'KeysetListAllTestCase.test_select mock',
67     'created_at': '2023-08-28T12:34:56.123456Z',
68 }
69
70 class KeysetListAllTestCase(unittest.TestCase):
71     def test_empty(self):
72         ks = KeysetTestHelper([[
73             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
74             {"items": []}
75         ]])
76
77         ls = list(arvados.util.keyset_list_all(ks.fn))
78         self.assertEqual(ls, [])
79
80     def test_oneitem(self):
81         ks = KeysetTestHelper([[
82             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
83             {"items": [{"created_at": "1", "uuid": "1"}]}
84         ], [
85             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", "=", "1"], ["uuid", ">", "1"]]},
86             {"items": []}
87         ],[
88             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">", "1"]]},
89             {"items": []}
90         ]])
91
92         ls = list(arvados.util.keyset_list_all(ks.fn))
93         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}])
94
95     def test_onepage2(self):
96         ks = KeysetTestHelper([[
97             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
98             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
99         ], [
100             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"]]},
101             {"items": []}
102         ]])
103
104         ls = list(arvados.util.keyset_list_all(ks.fn))
105         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}])
106
107     def test_onepage3(self):
108         ks = KeysetTestHelper([[
109             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
110             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}]}
111         ], [
112             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "3"], ["uuid", "!=", "3"]]},
113             {"items": []}
114         ]])
115
116         ls = list(arvados.util.keyset_list_all(ks.fn))
117         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}])
118
119
120     def test_twopage(self):
121         ks = KeysetTestHelper([[
122             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
123             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
124         ], [
125             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"]]},
126             {"items": [{"created_at": "3", "uuid": "3"}, {"created_at": "4", "uuid": "4"}]}
127         ], [
128             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "4"], ["uuid", "!=", "4"]]},
129             {"items": []}
130         ]])
131
132         ls = list(arvados.util.keyset_list_all(ks.fn))
133         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"},
134                               {"created_at": "2", "uuid": "2"},
135                               {"created_at": "3", "uuid": "3"},
136                               {"created_at": "4", "uuid": "4"}
137         ])
138
139     def test_repeated_key(self):
140         ks = KeysetTestHelper([[
141             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
142             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "3"}]}
143         ], [
144             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "3"]]},
145             {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "4"}]}
146         ], [
147             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", "=", "2"], ["uuid", ">", "4"]]},
148             {"items": []}
149         ], [
150             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">", "2"]]},
151             {"items": [{"created_at": "3", "uuid": "5"}, {"created_at": "4", "uuid": "6"}]}
152         ], [
153             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "4"], ["uuid", "!=", "6"]]},
154             {"items": []}
155         ],
156         ])
157
158         ls = list(arvados.util.keyset_list_all(ks.fn))
159         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"},
160                               {"created_at": "2", "uuid": "2"},
161                               {"created_at": "2", "uuid": "3"},
162                               {"created_at": "2", "uuid": "4"},
163                               {"created_at": "3", "uuid": "5"},
164                               {"created_at": "4", "uuid": "6"}
165         ])
166
167     def test_onepage_withfilter(self):
168         ks = KeysetTestHelper([[
169             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["foo", ">", "bar"]]},
170             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
171         ], [
172             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"], ["foo", ">", "bar"]]},
173             {"items": []}
174         ]])
175
176         ls = list(arvados.util.keyset_list_all(ks.fn, filters=[["foo", ">", "bar"]]))
177         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}])
178
179     def test_onepage_desc(self):
180         ks = KeysetTestHelper([[
181             {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": []},
182             {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}]}
183         ], [
184             {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": [["created_at", "<=", "1"], ["uuid", "!=", "1"]]},
185             {"items": []}
186         ]])
187
188         ls = list(arvados.util.keyset_list_all(ks.fn, ascending=False))
189         self.assertEqual(ls, [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}])
190
191     @parameterized.parameterized.expand(zip(
192         itertools.cycle(_SELECT_FAKE_ITEM),
193         itertools.chain.from_iterable(
194             itertools.combinations(_SELECT_FAKE_ITEM, count)
195             for count in range(len(_SELECT_FAKE_ITEM) + 1)
196         ),
197     ))
198     def test_select(self, order_key, select):
199         # keyset_list_all must have both uuid and order_key to function.
200         # Test that it selects those fields along with user-specified ones.
201         expect_select = {'uuid', order_key, *select}
202         item = {
203             key: value
204             for key, value in _SELECT_FAKE_ITEM.items()
205             if key in expect_select
206         }
207         list_func = mock.Mock()
208         list_func().execute = mock.Mock(
209             side_effect=[
210                 {'items': [item]},
211                 {'items': []},
212                 {'items': []},
213             ],
214         )
215         list_func.reset_mock()
216         actual = list(arvados.util.keyset_list_all(list_func, order_key, select=list(select)))
217         self.assertEqual(actual, [item])
218         calls = list_func.call_args_list
219         self.assertTrue(len(calls) >= 2, "list_func() not called enough to exhaust items")
220         for args, kwargs in calls:
221             self.assertEqual(set(kwargs.get('select', ())), expect_select)
222
223
224 class TestBaseDirectories:
225     SELF_PATH = Path(__file__)
226
227     @pytest.fixture
228     def dir_spec(self, tmp_path):
229         return arvados.util._BaseDirectorySpec(
230             'TEST_DIRECTORY',
231             'XDG_TEST_HOME',
232             Path('.test'),
233             'XDG_TEST_DIRS',
234             f"{tmp_path / '.test1'}:{tmp_path / '.test2'}",
235         )
236
237     @pytest.fixture
238     def env(self, tmp_path):
239         return {'HOME': str(tmp_path)}
240
241     def test_search_systemd_dirs(self, dir_spec, env, tmp_path):
242         env['TEST_DIRECTORY'] = f'{tmp_path}:{self.SELF_PATH.parent}'
243         dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
244         actual = list(dirs.search(self.SELF_PATH.name))
245         assert actual == [self.SELF_PATH]
246
247     def test_search_xdg_home(self, dir_spec, env, tmp_path):
248         env['XDG_TEST_HOME'] = str(self.SELF_PATH.parent.parent)
249         dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
250         actual = list(dirs.search(self.SELF_PATH.name))
251         assert actual == [self.SELF_PATH]
252
253     def test_search_xdg_dirs(self, dir_spec, env, tmp_path):
254         env['XDG_TEST_DIRS'] = f'{tmp_path}:{self.SELF_PATH.parent.parent}'
255         dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
256         actual = list(dirs.search(self.SELF_PATH.name))
257         assert actual == [self.SELF_PATH]
258
259     def test_search_all_dirs(self, dir_spec, env, tmp_path):
260         env['TEST_DIRECTORY'] = f'{tmp_path}:{self.SELF_PATH.parent}'
261         env['XDG_TEST_HOME'] = str(self.SELF_PATH.parent.parent)
262         env['XDG_TEST_DIRS'] = f'{tmp_path}:{self.SELF_PATH.parent.parent}'
263         dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
264         actual = list(dirs.search(self.SELF_PATH.name))
265         assert actual == [self.SELF_PATH, self.SELF_PATH, self.SELF_PATH]
266
267     def test_search_default_home(self, dir_spec, env, tmp_path):
268         expected = tmp_path / dir_spec.xdg_home_default / 'default_home'
269         expected.parent.mkdir()
270         expected.touch()
271         dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
272         actual = list(dirs.search(expected.name))
273         assert actual == [expected]
274
275     def test_search_default_dirs(self, dir_spec, env, tmp_path):
276         _, _, default_dir = dir_spec.xdg_dirs_default.rpartition(':')
277         expected = Path(default_dir, 'default_dirs')
278         expected.parent.mkdir()
279         expected.touch()
280         dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
281         actual = list(dirs.search(expected.name))
282         assert actual == [expected]
283
284     def test_search_no_default_dirs(self, dir_spec, env, tmp_path):
285         dir_spec.xdg_dirs_key = None
286         dir_spec.xdg_dirs_default = None
287         for subdir in ['.test1', '.test2', dir_spec.xdg_home_default]:
288             expected = tmp_path / subdir / 'no_dirs'
289             expected.parent.mkdir()
290             expected.touch()
291         dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
292         actual = list(dirs.search(expected.name))
293         assert actual == [expected]
294
295     def test_ignore_relative_directories(self, dir_spec, env, tmp_path):
296         test_path = Path(*self.SELF_PATH.parts[-2:])
297         assert test_path.exists(), "test setup problem: need an existing file in a subdirectory of ."
298         parent_path = str(test_path.parent)
299         env['TEST_DIRECTORY'] = '.'
300         env['XDG_TEST_HOME'] = parent_path
301         env['XDG_TEST_DIRS'] = parent_path
302         dirs = arvados.util._BaseDirectories(dir_spec, env, parent_path)
303         assert not list(dirs.search(test_path.name))
304
305     def test_storage_path_systemd(self, dir_spec, env, tmp_path):
306         expected = tmp_path / 'rwsystemd'
307         expected.mkdir(0o700)
308         env['TEST_DIRECTORY'] = str(expected)
309         dirs = arvados.util._BaseDirectories(dir_spec, env)
310         assert dirs.storage_path() == expected
311
312     def test_storage_path_systemd_mixed_modes(self, dir_spec, env, tmp_path):
313         rodir = tmp_path / 'rodir'
314         rodir.mkdir(0o500)
315         expected = tmp_path / 'rwdir'
316         expected.mkdir(0o700)
317         env['TEST_DIRECTORY'] = f'{rodir}:{expected}'
318         dirs = arvados.util._BaseDirectories(dir_spec, env)
319         assert dirs.storage_path() == expected
320
321     def test_storage_path_xdg_home(self, dir_spec, env, tmp_path):
322         expected = tmp_path / '.xdghome' / 'arvados'
323         env['XDG_TEST_HOME'] = str(expected.parent)
324         dirs = arvados.util._BaseDirectories(dir_spec, env)
325         assert dirs.storage_path() == expected
326         exp_mode = stat.S_IFDIR | stat.S_IWUSR
327         assert (expected.stat().st_mode & exp_mode) == exp_mode
328
329     def test_storage_path_default(self, dir_spec, env, tmp_path):
330         expected = tmp_path / dir_spec.xdg_home_default / 'arvados'
331         dirs = arvados.util._BaseDirectories(dir_spec, env)
332         assert dirs.storage_path() == expected
333         exp_mode = stat.S_IFDIR | stat.S_IWUSR
334         assert (expected.stat().st_mode & exp_mode) == exp_mode
335
336     def test_empty_xdg_home(self, dir_spec, env, tmp_path):
337         env['XDG_TEST_HOME'] = ''
338         expected = tmp_path / dir_spec.xdg_home_default / 'emptyhome'
339         dirs = arvados.util._BaseDirectories(dir_spec, env, expected.name)
340         assert dirs.storage_path() == expected
341
342     def test_empty_xdg_dirs(self, dir_spec, env, tmp_path):
343         env['XDG_TEST_DIRS'] = ''
344         _, _, default_dir = dir_spec.xdg_dirs_default.rpartition(':')
345         expected = Path(default_dir, 'empty_dirs')
346         expected.parent.mkdir()
347         expected.touch()
348         dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
349         actual = list(dirs.search(expected.name))
350         assert actual == [expected]
351
352     def test_spec_key_lookup(self):
353         dirs = arvados.util._BaseDirectories('CACHE')
354         assert dirs._spec.systemd_key == 'CACHE_DIRECTORY'
355         assert dirs._spec.xdg_dirs_key is None
356
357     def test_spec_enum_lookup(self):
358         dirs = arvados.util._BaseDirectories(arvados.util._BaseDirectorySpecs.CONFIG)
359         assert dirs._spec.systemd_key == 'CONFIGURATION_DIRECTORY'