# Copyright (C) The Arvados Authors. All rights reserved. # # SPDX-License-Identifier: AGPL-3.0 import collections import itertools import json import re import unittest from pathlib import Path from parameterized import parameterized from arvados_fuse import fusedir from .integration_test import IntegrationTest from .mount_test_base import MountTestBase from .run_test_server import fixture _COLLECTIONS = fixture('collections') _GROUPS = fixture('groups') _LINKS = fixture('links') _USERS = fixture('users') class DirectoryFiltersTestCase(MountTestBase): DEFAULT_ROOT_KWARGS = { 'enable_write': False, 'filters': [ ['collections.name', 'like', 'zzzzz-4zz18-%'], # This matches both "A Project" (which we use as the test root) # and "A Subproject" (which we assert is found under it). ['groups.name', 'like', 'A %roject'], ], } EXPECTED_PATHS = frozenset([ _COLLECTIONS['foo_collection_in_aproject']['name'], _GROUPS['asubproject']['name'], ]) CHECKED_PATHS = EXPECTED_PATHS.union([ _COLLECTIONS['collection_to_move_around_in_aproject']['name'], _GROUPS['subproject_in_active_user_home_project_to_test_unique_key_violation']['name'], ]) @parameterized.expand([ (fusedir.MagicDirectory, {}, _GROUPS['aproject']['uuid']), (fusedir.ProjectDirectory, {'project_object': _GROUPS['aproject']}, '.'), (fusedir.SharedDirectory, {'exclude': None}, Path( '{first_name} {last_name}'.format_map(_USERS['active']), _GROUPS['aproject']['name'], )), ]) def test_filtered_path_exists(self, root_class, root_kwargs, subdir): root_kwargs = collections.ChainMap(root_kwargs, self.DEFAULT_ROOT_KWARGS) self.make_mount(root_class, **root_kwargs) dir_path = Path(self.mounttmp, subdir) actual = frozenset( basename for basename in self.CHECKED_PATHS if (dir_path / basename).exists() ) self.assertEqual( actual, self.EXPECTED_PATHS, "mount existence checks did not match expected results", ) @parameterized.expand([ (fusedir.MagicDirectory, {}, _GROUPS['aproject']['uuid']), (fusedir.ProjectDirectory, {'project_object': _GROUPS['aproject']}, '.'), (fusedir.SharedDirectory, {'exclude': None}, Path( '{first_name} {last_name}'.format_map(_USERS['active']), _GROUPS['aproject']['name'], )), ]) def test_filtered_path_listing(self, root_class, root_kwargs, subdir): root_kwargs = collections.ChainMap(root_kwargs, self.DEFAULT_ROOT_KWARGS) self.make_mount(root_class, **root_kwargs) actual = frozenset(path.name for path in Path(self.mounttmp, subdir).iterdir()) self.assertEqual( actual & self.EXPECTED_PATHS, self.EXPECTED_PATHS, "mount listing did not include minimum matches", ) extra = frozenset( name for name in actual if not (name.startswith('zzzzz-4zz18-') or name.endswith('roject')) ) self.assertFalse( extra, "mount listing included results outside filters", ) class TagFiltersTestCase(MountTestBase): COLL_UUID = _COLLECTIONS['foo_collection_in_aproject']['uuid'] TAG_NAME = _LINKS['foo_collection_tag']['name'] @parameterized.expand([ '=', '!=', ]) def test_tag_directory_filters(self, op): self.make_mount( fusedir.TagDirectory, enable_write=False, filters=[ ['links.head_uuid', op, self.COLL_UUID], ], tag=self.TAG_NAME, ) checked_path = Path(self.mounttmp, self.COLL_UUID) self.assertEqual(checked_path.exists(), op == '=') @parameterized.expand(itertools.product( ['in', 'not in'], ['=', '!='], )) def test_tags_directory_filters(self, coll_op, link_op): self.make_mount( fusedir.TagsDirectory, enable_write=False, filters=[ ['links.head_uuid', coll_op, [self.COLL_UUID]], ['links.name', link_op, self.TAG_NAME], ], ) if link_op == '!=': filtered_path = Path(self.mounttmp, self.TAG_NAME) elif coll_op == 'not in': # As of 2024-02-09, foo tag only applies to the single collection. # If you filter it out via head_uuid, then it disappears completely # from the TagsDirectory. Hence we set that tag directory as # filtered_path. If any of this changes in the future, # it would be fine to append self.COLL_UUID to filtered_path here. filtered_path = Path(self.mounttmp, self.TAG_NAME) else: filtered_path = Path(self.mounttmp, self.TAG_NAME, self.COLL_UUID, 'foo', 'nonexistent') expect_path = filtered_path.parent self.assertTrue( expect_path.exists(), f"path not found but should exist: {expect_path}", ) self.assertFalse( filtered_path.exists(), f"path was found but should be filtered out: {filtered_path}", ) class FiltersIntegrationTest(IntegrationTest): COLLECTIONS_BY_PROP = { coll['properties']['MainFile']: coll for coll in _COLLECTIONS.values() if coll['owner_uuid'] == _GROUPS['fuse_filters_test_project']['uuid'] } PROP_VALUES = list(COLLECTIONS_BY_PROP) for test_n, query in enumerate(['foo', 'ba?']): @IntegrationTest.mount([ '--filters', json.dumps([ ['collections.properties.MainFile', 'like', query], ]), '--mount-by-pdh', 'by_pdh', '--mount-by-id', 'by_id', '--mount-home', 'home', ]) def _test_func(self, query=query): pdh_path = Path(self.mnt, 'by_pdh') id_path = Path(self.mnt, 'by_id') home_path = Path(self.mnt, 'home') query_re = re.compile(query.replace('?', '.')) for prop_val, coll in self.COLLECTIONS_BY_PROP.items(): should_exist = query_re.fullmatch(prop_val) is not None for path in [ pdh_path / coll['portable_data_hash'], id_path / coll['portable_data_hash'], id_path / coll['uuid'], home_path / coll['name'], ]: self.assertEqual( path.exists(), should_exist, f"{path} from MainFile={prop_val} exists!={should_exist}", ) exec(f"test_collection_properties_filters_{test_n} = _test_func") for test_n, mount_opts in enumerate([ ['--home'], ['--project', _GROUPS['aproject']['uuid']], ]): @IntegrationTest.mount([ '--filters', json.dumps([ ['collections.name', 'like', 'zzzzz-4zz18-%'], ['groups.name', 'like', 'A %roject'], ]), *mount_opts, ]) def _test_func(self, mount_opts=mount_opts): root_path = Path(self.mnt) root_depth = len(root_path.parts) max_depth = 0 name_re = re.compile(r'(zzzzz-4zz18-.*|A .*roject)') dir_queue = [root_path] while dir_queue: root_path = dir_queue.pop() max_depth = max(max_depth, len(root_path.parts)) for child in root_path.iterdir(): if not child.is_dir(): continue match = name_re.fullmatch(child.name) self.assertIsNotNone( match, "found directory with name that should've been filtered", ) if not match.group(1).startswith('zzzzz-4zz18-'): dir_queue.append(child) self.assertGreaterEqual( max_depth, root_depth + (2 if mount_opts[0] == '--home' else 1), "test descended fewer subdirectories than expected", ) exec(f"test_multiple_name_filters_{test_n} = _test_func")