Merge branch '16100-mime-types'
[arvados.git] / services / fuse / tests / test_mount.py
index f539b3f7d055d9e8753fe85e917777adca00dc4c..593d945cff0be54e46cb360712efd20b28e1658d 100644 (file)
@@ -20,6 +20,7 @@ import arvados
 import arvados_fuse as fuse
 from . import run_test_server
 
+from .integration_test import IntegrationTest
 from .mount_test_base import MountTestBase
 
 logger = logging.getLogger('arvados.arv-mount')
@@ -1098,8 +1099,9 @@ class MagicDirApiError(FuseMagicTest):
             llfuse.listdir(os.path.join(self.mounttmp, self.testcollection))
 
 
-class FuseUnitTest(unittest.TestCase):
+class SanitizeFilenameTest(MountTestBase):
     def test_sanitize_filename(self):
+        pdir = fuse.ProjectDirectory(1, {}, self.api, 0, project_object=self.api.users().current().execute())
         acceptable = [
             "foo.txt",
             ".foo",
@@ -1119,15 +1121,15 @@ class FuseUnitTest(unittest.TestCase):
             "//",
             ]
         for f in acceptable:
-            self.assertEqual(f, fuse.sanitize_filename(f))
+            self.assertEqual(f, pdir.sanitize_filename(f))
         for f in unacceptable:
-            self.assertNotEqual(f, fuse.sanitize_filename(f))
+            self.assertNotEqual(f, pdir.sanitize_filename(f))
             # The sanitized filename should be the same length, though.
-            self.assertEqual(len(f), len(fuse.sanitize_filename(f)))
+            self.assertEqual(len(f), len(pdir.sanitize_filename(f)))
         # Special cases
-        self.assertEqual("_", fuse.sanitize_filename(""))
-        self.assertEqual("_", fuse.sanitize_filename("."))
-        self.assertEqual("__", fuse.sanitize_filename(".."))
+        self.assertEqual("_", pdir.sanitize_filename(""))
+        self.assertEqual("_", pdir.sanitize_filename("."))
+        self.assertEqual("__", pdir.sanitize_filename(".."))
 
 
 class FuseMagicTestPDHOnly(MountTestBase):
@@ -1191,3 +1193,63 @@ class FuseMagicTestPDHOnly(MountTestBase):
 
     def test_with_default_by_id(self):
         self.verify_pdh_only(skip_pdh_only=True)
+
+
+class SlashSubstitutionTest(IntegrationTest):
+    mnt_args = [
+        '--read-write',
+        '--mount-home', 'zzz',
+    ]
+
+    def setUp(self):
+        super(SlashSubstitutionTest, self).setUp()
+        self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
+        self.api.config = lambda: {"Collections": {"ForwardSlashNameSubstitution": "[SLASH]"}}
+        self.testcoll = self.api.collections().create(body={"name": "foo/bar/baz"}).execute()
+        self.testcolleasy = self.api.collections().create(body={"name": "foo-bar-baz"}).execute()
+        self.fusename = 'foo[SLASH]bar[SLASH]baz'
+
+    @IntegrationTest.mount(argv=mnt_args)
+    @mock.patch('arvados.util.get_config_once')
+    def test_slash_substitution_before_listing(self, get_config_once):
+        get_config_once.return_value = {"Collections": {"ForwardSlashNameSubstitution": "[SLASH]"}}
+        self.pool_test(os.path.join(self.mnt, 'zzz'), self.fusename)
+        self.checkContents()
+    @staticmethod
+    def _test_slash_substitution_before_listing(self, tmpdir, fusename):
+        with open(os.path.join(tmpdir, 'foo-bar-baz', 'waz'), 'w') as f:
+            f.write('xxx')
+        with open(os.path.join(tmpdir, fusename, 'waz'), 'w') as f:
+            f.write('foo')
+
+    @IntegrationTest.mount(argv=mnt_args)
+    @mock.patch('arvados.util.get_config_once')
+    def test_slash_substitution_after_listing(self, get_config_once):
+        get_config_once.return_value = {"Collections": {"ForwardSlashNameSubstitution": "[SLASH]"}}
+        self.pool_test(os.path.join(self.mnt, 'zzz'), self.fusename)
+        self.checkContents()
+    @staticmethod
+    def _test_slash_substitution_after_listing(self, tmpdir, fusename):
+        with open(os.path.join(tmpdir, 'foo-bar-baz', 'waz'), 'w') as f:
+            f.write('xxx')
+        os.listdir(tmpdir)
+        with open(os.path.join(tmpdir, fusename, 'waz'), 'w') as f:
+            f.write('foo')
+
+    def checkContents(self):
+        self.assertRegexpMatches(self.api.collections().get(uuid=self.testcoll['uuid']).execute()['manifest_text'], ' acbd18db') # md5(foo)
+        self.assertRegexpMatches(self.api.collections().get(uuid=self.testcolleasy['uuid']).execute()['manifest_text'], ' f561aaf6') # md5(xxx)
+
+    @IntegrationTest.mount(argv=mnt_args)
+    @mock.patch('arvados.util.get_config_once')
+    def test_slash_substitution_conflict(self, get_config_once):
+        self.testcollconflict = self.api.collections().create(body={"name": self.fusename}).execute()
+        get_config_once.return_value = {"Collections": {"ForwardSlashNameSubstitution": "[SLASH]"}}
+        self.pool_test(os.path.join(self.mnt, 'zzz'), self.fusename)
+        self.assertRegexpMatches(self.api.collections().get(uuid=self.testcollconflict['uuid']).execute()['manifest_text'], ' acbd18db') # md5(foo)
+        # foo/bar/baz collection unchanged, because it is masked by foo[SLASH]bar[SLASH]baz
+        self.assertEqual(self.api.collections().get(uuid=self.testcoll['uuid']).execute()['manifest_text'], '')
+    @staticmethod
+    def _test_slash_substitution_conflict(self, tmpdir, fusename):
+        with open(os.path.join(tmpdir, fusename, 'waz'), 'w') as f:
+            f.write('foo')