Merge branch 'zoe-translates/python-sdk-arv_copy-thread-exception'
[arvados.git] / sdk / python / tests / test_util.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import itertools
6 import os
7 import stat
8 import subprocess
9 import unittest
10
11 import parameterized
12 import pytest
13 from pathlib import Path
14 from unittest import mock
15
16 import arvados
17 import arvados.util
18
19
20 class KeysetTestHelper:
21     def __init__(self, expect):
22         self.n = 0
23         self.expect = expect
24
25     def fn(self, **kwargs):
26         if self.expect[self.n][0] != kwargs:
27             raise Exception("Didn't match %s != %s" % (self.expect[self.n][0], kwargs))
28         return self
29
30     def execute(self, num_retries):
31         self.n += 1
32         return self.expect[self.n-1][1]
33
34 _SELECT_FAKE_ITEM = {
35     'uuid': 'zzzzz-zyyyz-zzzzzyyyyywwwww',
36     'name': 'KeysetListAllTestCase.test_select mock',
37     'created_at': '2023-08-28T12:34:56.123456Z',
38 }
39
40 class KeysetListAllTestCase(unittest.TestCase):
41     def test_empty(self):
42         ks = KeysetTestHelper([[
43             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
44             {"items": []}
45         ]])
46
47         ls = list(arvados.util.keyset_list_all(ks.fn))
48         self.assertEqual(ls, [])
49
50     def test_oneitem(self):
51         ks = KeysetTestHelper([[
52             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
53             {"items": [{"created_at": "1", "uuid": "1"}]}
54         ], [
55             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", "=", "1"], ["uuid", ">", "1"]]},
56             {"items": []}
57         ],[
58             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">", "1"]]},
59             {"items": []}
60         ]])
61
62         ls = list(arvados.util.keyset_list_all(ks.fn))
63         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}])
64
65     def test_onepage2(self):
66         ks = KeysetTestHelper([[
67             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
68             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
69         ], [
70             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"]]},
71             {"items": []}
72         ]])
73
74         ls = list(arvados.util.keyset_list_all(ks.fn))
75         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}])
76
77     def test_onepage3(self):
78         ks = KeysetTestHelper([[
79             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
80             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}]}
81         ], [
82             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "3"], ["uuid", "!=", "3"]]},
83             {"items": []}
84         ]])
85
86         ls = list(arvados.util.keyset_list_all(ks.fn))
87         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}])
88
89
90     def test_twopage(self):
91         ks = KeysetTestHelper([[
92             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
93             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
94         ], [
95             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"]]},
96             {"items": [{"created_at": "3", "uuid": "3"}, {"created_at": "4", "uuid": "4"}]}
97         ], [
98             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "4"], ["uuid", "!=", "4"]]},
99             {"items": []}
100         ]])
101
102         ls = list(arvados.util.keyset_list_all(ks.fn))
103         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"},
104                               {"created_at": "2", "uuid": "2"},
105                               {"created_at": "3", "uuid": "3"},
106                               {"created_at": "4", "uuid": "4"}
107         ])
108
109     def test_repeated_key(self):
110         ks = KeysetTestHelper([[
111             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
112             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "3"}]}
113         ], [
114             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "3"]]},
115             {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "4"}]}
116         ], [
117             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", "=", "2"], ["uuid", ">", "4"]]},
118             {"items": []}
119         ], [
120             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">", "2"]]},
121             {"items": [{"created_at": "3", "uuid": "5"}, {"created_at": "4", "uuid": "6"}]}
122         ], [
123             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "4"], ["uuid", "!=", "6"]]},
124             {"items": []}
125         ],
126         ])
127
128         ls = list(arvados.util.keyset_list_all(ks.fn))
129         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"},
130                               {"created_at": "2", "uuid": "2"},
131                               {"created_at": "2", "uuid": "3"},
132                               {"created_at": "2", "uuid": "4"},
133                               {"created_at": "3", "uuid": "5"},
134                               {"created_at": "4", "uuid": "6"}
135         ])
136
137     def test_onepage_withfilter(self):
138         ks = KeysetTestHelper([[
139             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["foo", ">", "bar"]]},
140             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
141         ], [
142             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"], ["foo", ">", "bar"]]},
143             {"items": []}
144         ]])
145
146         ls = list(arvados.util.keyset_list_all(ks.fn, filters=[["foo", ">", "bar"]]))
147         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}])
148
149     def test_onepage_desc(self):
150         ks = KeysetTestHelper([[
151             {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": []},
152             {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}]}
153         ], [
154             {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": [["created_at", "<=", "1"], ["uuid", "!=", "1"]]},
155             {"items": []}
156         ]])
157
158         ls = list(arvados.util.keyset_list_all(ks.fn, ascending=False))
159         self.assertEqual(ls, [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}])
160
161     @parameterized.parameterized.expand(zip(
162         itertools.cycle(_SELECT_FAKE_ITEM),
163         itertools.chain.from_iterable(
164             itertools.combinations(_SELECT_FAKE_ITEM, count)
165             for count in range(len(_SELECT_FAKE_ITEM) + 1)
166         ),
167     ))
168     def test_select(self, order_key, select):
169         # keyset_list_all must have both uuid and order_key to function.
170         # Test that it selects those fields along with user-specified ones.
171         expect_select = {'uuid', order_key, *select}
172         item = {
173             key: value
174             for key, value in _SELECT_FAKE_ITEM.items()
175             if key in expect_select
176         }
177         list_func = mock.Mock()
178         list_func().execute = mock.Mock(
179             side_effect=[
180                 {'items': [item]},
181                 {'items': []},
182                 {'items': []},
183             ],
184         )
185         list_func.reset_mock()
186         actual = list(arvados.util.keyset_list_all(list_func, order_key, select=list(select)))
187         self.assertEqual(actual, [item])
188         calls = list_func.call_args_list
189         self.assertTrue(len(calls) >= 2, "list_func() not called enough to exhaust items")
190         for args, kwargs in calls:
191             self.assertEqual(set(kwargs.get('select', ())), expect_select)
192
193
194 class TestBaseDirectories:
195     SELF_PATH = Path(__file__)
196
197     @pytest.fixture
198     def dir_spec(self, tmp_path):
199         return arvados.util._BaseDirectorySpec(
200             'TEST_DIRECTORY',
201             'XDG_TEST_HOME',
202             Path('.test'),
203             'XDG_TEST_DIRS',
204             f"{tmp_path / '.test1'}:{tmp_path / '.test2'}",
205         )
206
207     @pytest.fixture
208     def env(self, tmp_path):
209         return {'HOME': str(tmp_path)}
210
211     @pytest.fixture
212     def umask(self):
213         orig_umask = os.umask(0o002)
214         try:
215             yield
216         finally:
217             os.umask(orig_umask)
218
219     def test_search_systemd_dirs(self, dir_spec, env, tmp_path):
220         env['TEST_DIRECTORY'] = f'{tmp_path}:{self.SELF_PATH.parent}'
221         dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
222         actual = list(dirs.search(self.SELF_PATH.name))
223         assert actual == [self.SELF_PATH]
224
225     def test_search_xdg_home(self, dir_spec, env, tmp_path):
226         env['XDG_TEST_HOME'] = str(self.SELF_PATH.parent.parent)
227         dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
228         actual = list(dirs.search(self.SELF_PATH.name))
229         assert actual == [self.SELF_PATH]
230
231     def test_search_xdg_dirs(self, dir_spec, env, tmp_path):
232         env['XDG_TEST_DIRS'] = f'{tmp_path}:{self.SELF_PATH.parent.parent}'
233         dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
234         actual = list(dirs.search(self.SELF_PATH.name))
235         assert actual == [self.SELF_PATH]
236
237     def test_search_all_dirs(self, dir_spec, env, tmp_path):
238         env['TEST_DIRECTORY'] = f'{tmp_path}:{self.SELF_PATH.parent}'
239         env['XDG_TEST_HOME'] = str(self.SELF_PATH.parent.parent)
240         env['XDG_TEST_DIRS'] = f'{tmp_path}:{self.SELF_PATH.parent.parent}'
241         dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests')
242         actual = list(dirs.search(self.SELF_PATH.name))
243         assert actual == [self.SELF_PATH, self.SELF_PATH, self.SELF_PATH]
244
245     def test_search_default_home(self, dir_spec, env, tmp_path):
246         expected = tmp_path / dir_spec.xdg_home_default / 'default_home'
247         expected.parent.mkdir()
248         expected.touch()
249         dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
250         actual = list(dirs.search(expected.name))
251         assert actual == [expected]
252
253     def test_search_default_dirs(self, dir_spec, env, tmp_path):
254         _, _, default_dir = dir_spec.xdg_dirs_default.rpartition(':')
255         expected = Path(default_dir, 'default_dirs')
256         expected.parent.mkdir()
257         expected.touch()
258         dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
259         actual = list(dirs.search(expected.name))
260         assert actual == [expected]
261
262     def test_search_no_default_dirs(self, dir_spec, env, tmp_path):
263         dir_spec.xdg_dirs_key = None
264         dir_spec.xdg_dirs_default = None
265         for subdir in ['.test1', '.test2', dir_spec.xdg_home_default]:
266             expected = tmp_path / subdir / 'no_dirs'
267             expected.parent.mkdir()
268             expected.touch()
269         dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
270         actual = list(dirs.search(expected.name))
271         assert actual == [expected]
272
273     def test_ignore_relative_directories(self, dir_spec, env, tmp_path):
274         test_path = Path(*self.SELF_PATH.parts[-2:])
275         assert test_path.exists(), "test setup problem: need an existing file in a subdirectory of ."
276         parent_path = str(test_path.parent)
277         env['TEST_DIRECTORY'] = '.'
278         env['XDG_TEST_HOME'] = parent_path
279         env['XDG_TEST_DIRS'] = parent_path
280         dirs = arvados.util._BaseDirectories(dir_spec, env, parent_path)
281         assert not list(dirs.search(test_path.name))
282
283     def test_search_warns_nondefault_home(self, dir_spec, env, tmp_path, caplog):
284         search_path = tmp_path / dir_spec.xdg_home_default / 'Search' / 'SearchConfig'
285         search_path.parent.mkdir(parents=True)
286         search_path.touch()
287         env[dir_spec.xdg_home_key] = str(tmp_path / '.nonexistent')
288         dirs = arvados.util._BaseDirectories(dir_spec, env, search_path.parent.name)
289         results = list(dirs.search(search_path.name))
290         expect_msg = "{} was not found under your configured ${} ({}), but does exist at the default location ({})".format(
291             Path(*search_path.parts[-2:]),
292             dir_spec.xdg_home_key,
293             env[dir_spec.xdg_home_key],
294             Path(*search_path.parts[:-2]),
295         )
296         assert caplog.messages
297         assert any(msg.startswith(expect_msg) for msg in caplog.messages)
298         assert not results
299
300     def test_storage_path_systemd(self, dir_spec, env, tmp_path):
301         expected = tmp_path / 'rwsystemd'
302         expected.mkdir(0o700)
303         env['TEST_DIRECTORY'] = str(expected)
304         dirs = arvados.util._BaseDirectories(dir_spec, env)
305         assert dirs.storage_path() == expected
306
307     def test_storage_path_systemd_mixed_modes(self, dir_spec, env, tmp_path):
308         rodir = tmp_path / 'rodir'
309         rodir.mkdir(0o500)
310         expected = tmp_path / 'rwdir'
311         expected.mkdir(0o700)
312         env['TEST_DIRECTORY'] = f'{rodir}:{expected}'
313         dirs = arvados.util._BaseDirectories(dir_spec, env)
314         assert dirs.storage_path() == expected
315
316     def test_storage_path_xdg_home(self, dir_spec, env, tmp_path):
317         expected = tmp_path / '.xdghome' / 'arvados'
318         env['XDG_TEST_HOME'] = str(expected.parent)
319         dirs = arvados.util._BaseDirectories(dir_spec, env)
320         assert dirs.storage_path() == expected
321         exp_mode = stat.S_IFDIR | stat.S_IWUSR
322         assert (expected.stat().st_mode & exp_mode) == exp_mode
323
324     def test_storage_path_default(self, dir_spec, env, tmp_path):
325         expected = tmp_path / dir_spec.xdg_home_default / 'arvados'
326         dirs = arvados.util._BaseDirectories(dir_spec, env)
327         assert dirs.storage_path() == expected
328         exp_mode = stat.S_IFDIR | stat.S_IWUSR
329         assert (expected.stat().st_mode & exp_mode) == exp_mode
330
331     @pytest.mark.parametrize('subdir,mode', [
332         ('str/dir', 0o750),
333         (Path('sub', 'path'), 0o770),
334     ])
335     def test_storage_path_subdir(self, dir_spec, env, umask, tmp_path, subdir, mode):
336         expected = tmp_path / dir_spec.xdg_home_default / 'arvados' / subdir
337         dirs = arvados.util._BaseDirectories(dir_spec, env)
338         actual = dirs.storage_path(subdir, mode)
339         assert actual == expected
340         expect_mode = mode | stat.S_IFDIR
341         actual_mode = actual.stat().st_mode
342         assert (actual_mode & expect_mode) == expect_mode
343         assert not (actual_mode & stat.S_IRWXO)
344
345     def test_empty_xdg_home(self, dir_spec, env, tmp_path):
346         env['XDG_TEST_HOME'] = ''
347         expected = tmp_path / dir_spec.xdg_home_default / 'emptyhome'
348         dirs = arvados.util._BaseDirectories(dir_spec, env, expected.name)
349         assert dirs.storage_path() == expected
350
351     def test_empty_xdg_dirs(self, dir_spec, env, tmp_path):
352         env['XDG_TEST_DIRS'] = ''
353         _, _, default_dir = dir_spec.xdg_dirs_default.rpartition(':')
354         expected = Path(default_dir, 'empty_dirs')
355         expected.parent.mkdir()
356         expected.touch()
357         dirs = arvados.util._BaseDirectories(dir_spec, env, '.')
358         actual = list(dirs.search(expected.name))
359         assert actual == [expected]
360
361     def test_spec_key_lookup(self):
362         dirs = arvados.util._BaseDirectories('CACHE')
363         assert dirs._spec.systemd_key == 'CACHE_DIRECTORY'
364         assert dirs._spec.xdg_dirs_key is None
365
366     def test_spec_enum_lookup(self):
367         dirs = arvados.util._BaseDirectories(arvados.util._BaseDirectorySpecs.CONFIG)
368         assert dirs._spec.systemd_key == 'CONFIGURATION_DIRECTORY'