# Copyright (C) The Arvados Authors. All rights reserved. # # SPDX-License-Identifier: Apache-2.0 import itertools import os import stat import subprocess import unittest import parameterized import pytest from pathlib import Path from unittest import mock import arvados import arvados.util class KeysetTestHelper: def __init__(self, expect, expect_num_retries=0): self.n = 0 self.expect = expect self.expect_num_retries = expect_num_retries def fn(self, **kwargs): if self.expect[self.n][0] != kwargs: raise Exception("Didn't match %s != %s" % (self.expect[self.n][0], kwargs)) return self def execute(self, num_retries): if num_retries != self.expect_num_retries: raise Exception(f"KeysetTestHelper called with num_retries={num_retries} but expected {expect_num_retries}") self.n += 1 return self.expect[self.n-1][1] _SELECT_FAKE_ITEM = { 'uuid': 'zzzzz-zyyyz-zzzzzyyyyywwwww', 'name': 'KeysetListAllTestCase.test_select mock', 'created_at': '2023-08-28T12:34:56.123456Z', } _FAKE_COMPUTED_PERMISSIONS_ITEM = { 'user_uuid': 'zzzzz-zyyyz-zzzzzyyyyywwwww', 'target_uuid': 'zzzzz-ttttt-xxxxxyyyyyzzzzz', 'perm_level': 'can_write', } class KeysetListAllTestCase(unittest.TestCase): def test_empty(self): ks = KeysetTestHelper([[ {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []}, {"items": []} ]]) ls = list(arvados.util.keyset_list_all(ks.fn)) self.assertEqual(ls, []) def test_oneitem(self): ks = KeysetTestHelper([[ {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []}, {"items": [{"created_at": "1", "uuid": "1"}]} ], [ {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", "=", "1"], ["uuid", ">", "1"]]}, {"items": []} ],[ {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">", "1"]]}, {"items": []} ]]) ls = list(arvados.util.keyset_list_all(ks.fn)) self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}]) def test_onepage2(self): ks = KeysetTestHelper([[ {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []}, {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]} ], [ {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"]]}, {"items": []} ]]) ls = list(arvados.util.keyset_list_all(ks.fn)) self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]) def test_onepage3(self): ks = KeysetTestHelper([[ {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []}, {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}]} ], [ {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "3"], ["uuid", "!=", "3"]]}, {"items": []} ]]) ls = list(arvados.util.keyset_list_all(ks.fn)) self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}]) def test_twopage(self): ks = KeysetTestHelper([[ {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []}, {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]} ], [ {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"]]}, {"items": [{"created_at": "3", "uuid": "3"}, {"created_at": "4", "uuid": "4"}]} ], [ {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "4"], ["uuid", "!=", "4"]]}, {"items": []} ]]) ls = list(arvados.util.keyset_list_all(ks.fn)) self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}, {"created_at": "4", "uuid": "4"} ]) def test_repeated_key(self): ks = KeysetTestHelper([[ {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []}, {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "3"}]} ], [ {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "3"]]}, {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "4"}]} ], [ {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", "=", "2"], ["uuid", ">", "4"]]}, {"items": []} ], [ {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">", "2"]]}, {"items": [{"created_at": "3", "uuid": "5"}, {"created_at": "4", "uuid": "6"}]} ], [ {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "4"], ["uuid", "!=", "6"]]}, {"items": []} ], ]) ls = list(arvados.util.keyset_list_all(ks.fn)) self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "3"}, {"created_at": "2", "uuid": "4"}, {"created_at": "3", "uuid": "5"}, {"created_at": "4", "uuid": "6"} ]) def test_onepage_withfilter(self): ks = KeysetTestHelper([[ {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["foo", ">", "bar"]]}, {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]} ], [ {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"], ["foo", ">", "bar"]]}, {"items": []} ]]) ls = list(arvados.util.keyset_list_all(ks.fn, filters=[["foo", ">", "bar"]])) self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]) def test_onepage_desc(self): ks = KeysetTestHelper([[ {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": []}, {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}]} ], [ {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": [["created_at", "<=", "1"], ["uuid", "!=", "1"]]}, {"items": []} ]]) ls = list(arvados.util.keyset_list_all(ks.fn, ascending=False)) self.assertEqual(ls, [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}]) @parameterized.parameterized.expand( (fake_item, key_fields, order_key, select) for (fake_item, key_fields) in [ (_SELECT_FAKE_ITEM, ('uuid',)), (_FAKE_COMPUTED_PERMISSIONS_ITEM, ('user_uuid', 'target_uuid')), ] for order_key in fake_item if order_key != 'perm_level' for count in range(len(fake_item) + 1) for select in itertools.combinations(fake_item, count) ) def test_select(self, fake_item, key_fields, order_key, select): # keyset_list_all must have both uuid and order_key to function. # Test that it selects those fields along with user-specified ones. expect_select = {*key_fields, order_key, *select} item = { key: value for key, value in fake_item.items() if key in expect_select } list_func = mock.Mock() list_func().execute = mock.Mock( side_effect=[ {'items': [item]}, {'items': []}, {'items': []}, ], ) list_func.reset_mock() actual = list(arvados.util.keyset_list_all(list_func, order_key, select=list(select), key_fields=key_fields)) self.assertEqual(actual, [item]) calls = list_func.call_args_list self.assertTrue(len(calls) >= 2, "list_func() not called enough to exhaust items") for args, kwargs in calls: self.assertEqual(set(kwargs.get('select', ())), expect_select) class TestBaseDirectories: SELF_PATH = Path(__file__) @pytest.fixture def dir_spec(self, tmp_path): return arvados.util._BaseDirectorySpec( 'TEST_DIRECTORY', 'XDG_TEST_HOME', Path('.test'), 'XDG_TEST_DIRS', f"{tmp_path / '.test1'}:{tmp_path / '.test2'}", ) @pytest.fixture def env(self, tmp_path): return {'HOME': str(tmp_path)} @pytest.fixture def umask(self): orig_umask = os.umask(0o002) try: yield finally: os.umask(orig_umask) def test_search_systemd_dirs(self, dir_spec, env, tmp_path): env['TEST_DIRECTORY'] = f'{tmp_path}:{self.SELF_PATH.parent}' dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests') actual = list(dirs.search(self.SELF_PATH.name)) assert actual == [self.SELF_PATH] def test_search_xdg_home(self, dir_spec, env, tmp_path): env['XDG_TEST_HOME'] = str(self.SELF_PATH.parent.parent) dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests') actual = list(dirs.search(self.SELF_PATH.name)) assert actual == [self.SELF_PATH] def test_search_xdg_dirs(self, dir_spec, env, tmp_path): env['XDG_TEST_DIRS'] = f'{tmp_path}:{self.SELF_PATH.parent.parent}' dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests') actual = list(dirs.search(self.SELF_PATH.name)) assert actual == [self.SELF_PATH] def test_search_all_dirs(self, dir_spec, env, tmp_path): env['TEST_DIRECTORY'] = f'{tmp_path}:{self.SELF_PATH.parent}' env['XDG_TEST_HOME'] = str(self.SELF_PATH.parent.parent) env['XDG_TEST_DIRS'] = f'{tmp_path}:{self.SELF_PATH.parent.parent}' dirs = arvados.util._BaseDirectories(dir_spec, env, 'tests') actual = list(dirs.search(self.SELF_PATH.name)) assert actual == [self.SELF_PATH, self.SELF_PATH, self.SELF_PATH] def test_search_default_home(self, dir_spec, env, tmp_path): expected = tmp_path / dir_spec.xdg_home_default / 'default_home' expected.parent.mkdir() expected.touch() dirs = arvados.util._BaseDirectories(dir_spec, env, '.') actual = list(dirs.search(expected.name)) assert actual == [expected] def test_search_default_dirs(self, dir_spec, env, tmp_path): _, _, default_dir = dir_spec.xdg_dirs_default.rpartition(':') expected = Path(default_dir, 'default_dirs') expected.parent.mkdir() expected.touch() dirs = arvados.util._BaseDirectories(dir_spec, env, '.') actual = list(dirs.search(expected.name)) assert actual == [expected] def test_search_no_default_dirs(self, dir_spec, env, tmp_path): dir_spec.xdg_dirs_key = None dir_spec.xdg_dirs_default = None for subdir in ['.test1', '.test2', dir_spec.xdg_home_default]: expected = tmp_path / subdir / 'no_dirs' expected.parent.mkdir() expected.touch() dirs = arvados.util._BaseDirectories(dir_spec, env, '.') actual = list(dirs.search(expected.name)) assert actual == [expected] def test_ignore_relative_directories(self, dir_spec, env, tmp_path): test_path = Path(*self.SELF_PATH.parts[-2:]) assert test_path.exists(), "test setup problem: need an existing file in a subdirectory of ." parent_path = str(test_path.parent) env['TEST_DIRECTORY'] = '.' env['XDG_TEST_HOME'] = parent_path env['XDG_TEST_DIRS'] = parent_path dirs = arvados.util._BaseDirectories(dir_spec, env, parent_path) assert not list(dirs.search(test_path.name)) def test_search_warns_nondefault_home(self, dir_spec, env, tmp_path, caplog): search_path = tmp_path / dir_spec.xdg_home_default / 'Search' / 'SearchConfig' search_path.parent.mkdir(parents=True) search_path.touch() env[dir_spec.xdg_home_key] = str(tmp_path / '.nonexistent') dirs = arvados.util._BaseDirectories(dir_spec, env, search_path.parent.name) results = list(dirs.search(search_path.name)) expect_msg = "{} was not found under your configured ${} ({}), but does exist at the default location ({})".format( Path(*search_path.parts[-2:]), dir_spec.xdg_home_key, env[dir_spec.xdg_home_key], Path(*search_path.parts[:-2]), ) assert caplog.messages assert any(msg.startswith(expect_msg) for msg in caplog.messages) assert not results def test_storage_path_systemd(self, dir_spec, env, tmp_path): expected = tmp_path / 'rwsystemd' expected.mkdir(0o700) env['TEST_DIRECTORY'] = str(expected) dirs = arvados.util._BaseDirectories(dir_spec, env) assert dirs.storage_path() == expected def test_storage_path_systemd_mixed_modes(self, dir_spec, env, tmp_path): rodir = tmp_path / 'rodir' rodir.mkdir(0o500) expected = tmp_path / 'rwdir' expected.mkdir(0o700) env['TEST_DIRECTORY'] = f'{rodir}:{expected}' dirs = arvados.util._BaseDirectories(dir_spec, env) assert dirs.storage_path() == expected def test_storage_path_xdg_home(self, dir_spec, env, tmp_path): expected = tmp_path / '.xdghome' / 'arvados' env['XDG_TEST_HOME'] = str(expected.parent) dirs = arvados.util._BaseDirectories(dir_spec, env) assert dirs.storage_path() == expected exp_mode = stat.S_IFDIR | stat.S_IWUSR assert (expected.stat().st_mode & exp_mode) == exp_mode def test_storage_path_default(self, dir_spec, env, tmp_path): expected = tmp_path / dir_spec.xdg_home_default / 'arvados' dirs = arvados.util._BaseDirectories(dir_spec, env) assert dirs.storage_path() == expected exp_mode = stat.S_IFDIR | stat.S_IWUSR assert (expected.stat().st_mode & exp_mode) == exp_mode @pytest.mark.parametrize('subdir,mode', [ ('str/dir', 0o750), (Path('sub', 'path'), 0o770), ]) def test_storage_path_subdir(self, dir_spec, env, umask, tmp_path, subdir, mode): expected = tmp_path / dir_spec.xdg_home_default / 'arvados' / subdir dirs = arvados.util._BaseDirectories(dir_spec, env) actual = dirs.storage_path(subdir, mode) assert actual == expected expect_mode = mode | stat.S_IFDIR actual_mode = actual.stat().st_mode assert (actual_mode & expect_mode) == expect_mode assert not (actual_mode & stat.S_IRWXO) def test_empty_xdg_home(self, dir_spec, env, tmp_path): env['XDG_TEST_HOME'] = '' expected = tmp_path / dir_spec.xdg_home_default / 'emptyhome' dirs = arvados.util._BaseDirectories(dir_spec, env, expected.name) assert dirs.storage_path() == expected def test_empty_xdg_dirs(self, dir_spec, env, tmp_path): env['XDG_TEST_DIRS'] = '' _, _, default_dir = dir_spec.xdg_dirs_default.rpartition(':') expected = Path(default_dir, 'empty_dirs') expected.parent.mkdir() expected.touch() dirs = arvados.util._BaseDirectories(dir_spec, env, '.') actual = list(dirs.search(expected.name)) assert actual == [expected] def test_spec_key_lookup(self): dirs = arvados.util._BaseDirectories('CACHE') assert dirs._spec.systemd_key == 'CACHE_DIRECTORY' assert dirs._spec.xdg_dirs_key is None def test_spec_enum_lookup(self): dirs = arvados.util._BaseDirectories(arvados.util._BaseDirectorySpecs.CONFIG) assert dirs._spec.systemd_key == 'CONFIGURATION_DIRECTORY'