21700: Install Bundler system-wide in Rails postinst
[arvados.git] / services / fuse / tests / test_mount_filters.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import collections
6 import itertools
7 import json
8 import re
9 import unittest
10
11 from pathlib import Path
12
13 from parameterized import parameterized
14
15 from arvados_fuse import fusedir
16
17 from .integration_test import IntegrationTest
18 from .mount_test_base import MountTestBase
19 from .run_test_server import fixture
20
21 _COLLECTIONS = fixture('collections')
22 _GROUPS = fixture('groups')
23 _LINKS = fixture('links')
24 _USERS = fixture('users')
25
26 class DirectoryFiltersTestCase(MountTestBase):
27     DEFAULT_ROOT_KWARGS = {
28         'enable_write': False,
29         'filters': [
30             ['collections.name', 'like', 'zzzzz-4zz18-%'],
31             # This matches both "A Project" (which we use as the test root)
32             # and "A Subproject" (which we assert is found under it).
33             ['groups.name', 'like', 'A %roject'],
34         ],
35     }
36     EXPECTED_PATHS = frozenset([
37         _COLLECTIONS['foo_collection_in_aproject']['name'],
38         _GROUPS['asubproject']['name'],
39     ])
40     CHECKED_PATHS = EXPECTED_PATHS.union([
41         _COLLECTIONS['collection_to_move_around_in_aproject']['name'],
42         _GROUPS['subproject_in_active_user_home_project_to_test_unique_key_violation']['name'],
43     ])
44
45     @parameterized.expand([
46         (fusedir.MagicDirectory, {}, _GROUPS['aproject']['uuid']),
47         (fusedir.ProjectDirectory, {'project_object': _GROUPS['aproject']}, '.'),
48         (fusedir.SharedDirectory, {'exclude': None}, Path(
49             '{first_name} {last_name}'.format_map(_USERS['active']),
50             _GROUPS['aproject']['name'],
51         )),
52     ])
53     def test_filtered_path_exists(self, root_class, root_kwargs, subdir):
54         root_kwargs = collections.ChainMap(root_kwargs, self.DEFAULT_ROOT_KWARGS)
55         self.make_mount(root_class, **root_kwargs)
56         dir_path = Path(self.mounttmp, subdir)
57         actual = frozenset(
58             basename
59             for basename in self.CHECKED_PATHS
60             if (dir_path / basename).exists()
61         )
62         self.assertEqual(
63             actual,
64             self.EXPECTED_PATHS,
65             "mount existence checks did not match expected results",
66         )
67
68     @parameterized.expand([
69         (fusedir.MagicDirectory, {}, _GROUPS['aproject']['uuid']),
70         (fusedir.ProjectDirectory, {'project_object': _GROUPS['aproject']}, '.'),
71         (fusedir.SharedDirectory, {'exclude': None}, Path(
72             '{first_name} {last_name}'.format_map(_USERS['active']),
73             _GROUPS['aproject']['name'],
74         )),
75     ])
76     def test_filtered_path_listing(self, root_class, root_kwargs, subdir):
77         root_kwargs = collections.ChainMap(root_kwargs, self.DEFAULT_ROOT_KWARGS)
78         self.make_mount(root_class, **root_kwargs)
79         actual = frozenset(path.name for path in Path(self.mounttmp, subdir).iterdir())
80         self.assertEqual(
81             actual & self.EXPECTED_PATHS,
82             self.EXPECTED_PATHS,
83             "mount listing did not include minimum matches",
84         )
85         extra = frozenset(
86             name
87             for name in actual
88             if not (name.startswith('zzzzz-4zz18-') or name.endswith('roject'))
89         )
90         self.assertFalse(
91             extra,
92             "mount listing included results outside filters",
93         )
94
95
96 class TagFiltersTestCase(MountTestBase):
97     COLL_UUID = _COLLECTIONS['foo_collection_in_aproject']['uuid']
98     TAG_NAME = _LINKS['foo_collection_tag']['name']
99
100     @parameterized.expand([
101         '=',
102         '!=',
103     ])
104     def test_tag_directory_filters(self, op):
105         self.make_mount(
106             fusedir.TagDirectory,
107             enable_write=False,
108             filters=[
109                 ['links.head_uuid', op, self.COLL_UUID],
110             ],
111             tag=self.TAG_NAME,
112         )
113         checked_path = Path(self.mounttmp, self.COLL_UUID)
114         self.assertEqual(checked_path.exists(), op == '=')
115
116     @parameterized.expand(itertools.product(
117         ['in', 'not in'],
118         ['=', '!='],
119     ))
120     def test_tags_directory_filters(self, coll_op, link_op):
121         self.make_mount(
122             fusedir.TagsDirectory,
123             enable_write=False,
124             filters=[
125                 ['links.head_uuid', coll_op, [self.COLL_UUID]],
126                 ['links.name', link_op, self.TAG_NAME],
127             ],
128         )
129         if link_op == '!=':
130             filtered_path = Path(self.mounttmp, self.TAG_NAME)
131         elif coll_op == 'not in':
132             # As of 2024-02-09, foo tag only applies to the single collection.
133             # If you filter it out via head_uuid, then it disappears completely
134             # from the TagsDirectory. Hence we set that tag directory as
135             # filtered_path. If any of this changes in the future,
136             # it would be fine to append self.COLL_UUID to filtered_path here.
137             filtered_path = Path(self.mounttmp, self.TAG_NAME)
138         else:
139             filtered_path = Path(self.mounttmp, self.TAG_NAME, self.COLL_UUID, 'foo', 'nonexistent')
140         expect_path = filtered_path.parent
141         self.assertTrue(
142             expect_path.exists(),
143             f"path not found but should exist: {expect_path}",
144         )
145         self.assertFalse(
146             filtered_path.exists(),
147             f"path was found but should be filtered out: {filtered_path}",
148         )
149
150
151 class FiltersIntegrationTest(IntegrationTest):
152     COLLECTIONS_BY_PROP = {
153         coll['properties']['MainFile']: coll
154         for coll in _COLLECTIONS.values()
155         if coll['owner_uuid'] == _GROUPS['fuse_filters_test_project']['uuid']
156     }
157     PROP_VALUES = list(COLLECTIONS_BY_PROP)
158
159     for test_n, query in enumerate(['foo', 'ba?']):
160         @IntegrationTest.mount([
161             '--filters', json.dumps([
162                 ['collections.properties.MainFile', 'like', query],
163             ]),
164             '--mount-by-pdh', 'by_pdh',
165             '--mount-by-id', 'by_id',
166             '--mount-home', 'home',
167         ])
168         def _test_func(self, query=query):
169             pdh_path = Path(self.mnt, 'by_pdh')
170             id_path = Path(self.mnt, 'by_id')
171             home_path = Path(self.mnt, 'home')
172             query_re = re.compile(query.replace('?', '.'))
173             for prop_val, coll in self.COLLECTIONS_BY_PROP.items():
174                 should_exist = query_re.fullmatch(prop_val) is not None
175                 for path in [
176                         pdh_path / coll['portable_data_hash'],
177                         id_path / coll['portable_data_hash'],
178                         id_path / coll['uuid'],
179                         home_path / coll['name'],
180                 ]:
181                     self.assertEqual(
182                         path.exists(),
183                         should_exist,
184                         f"{path} from MainFile={prop_val} exists!={should_exist}",
185                     )
186         exec(f"test_collection_properties_filters_{test_n} = _test_func")
187
188     for test_n, mount_opts in enumerate([
189             ['--home'],
190             ['--project', _GROUPS['aproject']['uuid']],
191     ]):
192         @IntegrationTest.mount([
193             '--filters', json.dumps([
194                 ['collections.name', 'like', 'zzzzz-4zz18-%'],
195                 ['groups.name', 'like', 'A %roject'],
196             ]),
197             *mount_opts,
198         ])
199         def _test_func(self, mount_opts=mount_opts):
200             root_path = Path(self.mnt)
201             root_depth = len(root_path.parts)
202             max_depth = 0
203             name_re = re.compile(r'(zzzzz-4zz18-.*|A .*roject)')
204             dir_queue = [root_path]
205             while dir_queue:
206                 root_path = dir_queue.pop()
207                 max_depth = max(max_depth, len(root_path.parts))
208                 for child in root_path.iterdir():
209                     if not child.is_dir():
210                         continue
211                     match = name_re.fullmatch(child.name)
212                     self.assertIsNotNone(
213                         match,
214                         "found directory with name that should've been filtered",
215                     )
216                     if not match.group(1).startswith('zzzzz-4zz18-'):
217                         dir_queue.append(child)
218             self.assertGreaterEqual(
219                 max_depth,
220                 root_depth + (2 if mount_opts[0] == '--home' else 1),
221                 "test descended fewer subdirectories than expected",
222             )
223         exec(f"test_multiple_name_filters_{test_n} = _test_func")