-import arvados
-import arvados.safeapi
-import arvados_fuse as fuse
-import glob
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+from __future__ import absolute_import
+from future.utils import viewitems
+from builtins import str
+from builtins import object
+from six import assertRegex
import json
import llfuse
+import logging
+import mock
import os
-import shutil
import subprocess
-import sys
-import tempfile
-import threading
import time
import unittest
-import logging
-import multiprocessing
-import run_test_server
-import mock
-from mount_test_base import MountTestBase
+import arvados
+import arvados_fuse as fuse
+from . import run_test_server
+
+from .integration_test import IntegrationTest
+from .mount_test_base import MountTestBase
logger = logging.getLogger('arvados.arv-mount')
+class AssertWithTimeout(object):
+ """Allow some time for an assertion to pass."""
+
+ def __init__(self, timeout=0):
+ self.timeout = timeout
+
+ def __iter__(self):
+ self.deadline = time.time() + self.timeout
+ self.done = False
+ return self
+
+ def __next__(self):
+ if self.done:
+ raise StopIteration
+ return self.attempt
+
+ def attempt(self, fn, *args, **kwargs):
+ try:
+ fn(*args, **kwargs)
+ except AssertionError:
+ if time.time() > self.deadline:
+ raise
+ time.sleep(0.1)
+ else:
+ self.done = True
+
+
class FuseMountTest(MountTestBase):
def setUp(self):
super(FuseMountTest, self).setUp()
cw.write("data 8")
cw.start_new_stream('edgecases')
- for f in ":/.../-/*/\x01\\/ ".split("/"):
+ for f in ":/.../-/*/ ".split("/"):
cw.start_new_file(f)
cw.write('x')
- for f in ":/.../-/*/\x01\\/ ".split("/"):
+ for f in ":/.../-/*/ ".split("/"):
cw.start_new_stream('edgecases/dirs/' + f)
cw.start_new_file('x/x')
cw.write('x')
self.assertDirContents('dir2', ['thing5.txt', 'thing6.txt', 'dir3'])
self.assertDirContents('dir2/dir3', ['thing7.txt', 'thing8.txt'])
self.assertDirContents('edgecases',
- "dirs/:/.../-/*/\x01\\/ ".split("/"))
+ "dirs/:/.../-/*/ ".split("/"))
self.assertDirContents('edgecases/dirs',
- ":/.../-/*/\x01\\/ ".split("/"))
+ ":/.../-/*/ ".split("/"))
files = {'thing1.txt': 'data 1',
'thing2.txt': 'data 2',
'dir2/dir3/thing7.txt': 'data 7',
'dir2/dir3/thing8.txt': 'data 8'}
- for k, v in files.items():
- with open(os.path.join(self.mounttmp, k)) as f:
- self.assertEqual(v, f.read())
-
-
-class FuseNoAPITest(MountTestBase):
- def setUp(self):
- super(FuseNoAPITest, self).setUp()
- keep = arvados.keep.KeepClient(local_store=self.keeptmp)
- self.file_data = "API-free text\n"
- self.file_loc = keep.put(self.file_data)
- self.coll_loc = keep.put(". {} 0:{}:api-free.txt\n".format(
- self.file_loc, len(self.file_data)))
-
- def runTest(self):
- self.make_mount(fuse.MagicDirectory)
- self.assertDirContents(self.coll_loc, ['api-free.txt'])
- with open(os.path.join(
- self.mounttmp, self.coll_loc, 'api-free.txt')) as keep_file:
- actual = keep_file.read(-1)
- self.assertEqual(self.file_data, actual)
+ for k, v in viewitems(files):
+ with open(os.path.join(self.mounttmp, k), 'rb') as f:
+ self.assertEqual(v, f.read().decode())
class FuseMagicTest(MountTestBase):
def setUp(self, api=None):
super(FuseMagicTest, self).setUp(api=api)
+ self.test_project = run_test_server.fixture('groups')['aproject']['uuid']
+ self.non_project_group = run_test_server.fixture('groups')['public']['uuid']
+ self.collection_in_test_project = run_test_server.fixture('collections')['foo_collection_in_aproject']['name']
+
cw = arvados.CollectionWriter()
cw.start_new_file('thing1.txt')
self.testcollection = cw.finish()
self.test_manifest = cw.manifest_text()
- self.api.collections().create(body={"manifest_text":self.test_manifest}).execute()
+ coll = self.api.collections().create(body={"manifest_text":self.test_manifest}).execute()
+ self.test_manifest_pdh = coll['portable_data_hash']
def runTest(self):
self.make_mount(fuse.MagicDirectory)
self.assertFalse(any(arvados.util.keep_locator_pattern.match(fn) or
arvados.util.uuid_pattern.match(fn)
for fn in mount_ls),
- "new FUSE MagicDirectory lists Collection")
+ "new FUSE MagicDirectory has no collections or projects")
self.assertDirContents(self.testcollection, ['thing1.txt'])
self.assertDirContents(os.path.join('by_id', self.testcollection),
['thing1.txt'])
+ self.assertIn(self.collection_in_test_project,
+ llfuse.listdir(os.path.join(self.mounttmp, self.test_project)))
+ self.assertIn(self.collection_in_test_project,
+ llfuse.listdir(os.path.join(self.mounttmp, 'by_id', self.test_project)))
+
mount_ls = llfuse.listdir(self.mounttmp)
self.assertIn('README', mount_ls)
self.assertIn(self.testcollection, mount_ls)
self.assertIn(self.testcollection,
llfuse.listdir(os.path.join(self.mounttmp, 'by_id')))
+ self.assertIn(self.test_project, mount_ls)
+ self.assertIn(self.test_project,
+ llfuse.listdir(os.path.join(self.mounttmp, 'by_id')))
+
+ with self.assertRaises(OSError):
+ llfuse.listdir(os.path.join(self.mounttmp, 'by_id', self.non_project_group))
files = {}
files[os.path.join(self.mounttmp, self.testcollection, 'thing1.txt')] = 'data 1'
- for k, v in files.items():
- with open(os.path.join(self.mounttmp, k)) as f:
- self.assertEqual(v, f.read())
+ for k, v in viewitems(files):
+ with open(os.path.join(self.mounttmp, k), 'rb') as f:
+ self.assertEqual(v, f.read().decode())
class FuseTagsTest(MountTestBase):
bar_uuid = run_test_server.fixture('collections')['bar_file']['uuid']
self.tag_collection(bar_uuid, 'fuse_test_tag')
- time.sleep(1)
- self.assertIn('fuse_test_tag', llfuse.listdir(self.mounttmp))
+ for attempt in AssertWithTimeout(10):
+ attempt(self.assertIn, 'fuse_test_tag', llfuse.listdir(self.mounttmp))
self.assertDirContents('fuse_test_tag', [bar_uuid])
baz_uuid = run_test_server.fixture('collections')['baz_file']['uuid']
l = self.tag_collection(baz_uuid, 'fuse_test_tag')
- time.sleep(1)
- self.assertDirContents('fuse_test_tag', [bar_uuid, baz_uuid])
+ for attempt in AssertWithTimeout(10):
+ attempt(self.assertDirContents, 'fuse_test_tag', [bar_uuid, baz_uuid])
self.api.links().delete(uuid=l['uuid']).execute()
- time.sleep(1)
- self.assertDirContents('fuse_test_tag', [bar_uuid])
+ for attempt in AssertWithTimeout(10):
+ attempt(self.assertDirContents, 'fuse_test_tag', [bar_uuid])
+
+
+def fuseSharedTestHelper(mounttmp):
+ class Test(unittest.TestCase):
+ def runTest(self):
+ # Double check that we can open and read objects in this folder as a file,
+ # and that its contents are what we expect.
+ baz_path = os.path.join(
+ mounttmp,
+ 'FUSE User',
+ 'FUSE Test Project',
+ 'collection in FUSE project',
+ 'baz')
+ with open(baz_path) as f:
+ self.assertEqual("baz", f.read())
+
+ # check mtime on collection
+ st = os.stat(baz_path)
+ try:
+ mtime = st.st_mtime_ns // 1000000000
+ except AttributeError:
+ mtime = st.st_mtime
+ self.assertEqual(mtime, 1391448174)
+
+ # shared_dirs is a list of the directories exposed
+ # by fuse.SharedDirectory (i.e. any object visible
+ # to the current user)
+ shared_dirs = llfuse.listdir(mounttmp)
+ shared_dirs.sort()
+ self.assertIn('FUSE User', shared_dirs)
+
+ # fuse_user_objs is a list of the objects owned by the FUSE
+ # test user (which present as files in the 'FUSE User'
+ # directory)
+ fuse_user_objs = llfuse.listdir(os.path.join(mounttmp, 'FUSE User'))
+ fuse_user_objs.sort()
+ self.assertEqual(['FUSE Test Project', # project owned by user
+ 'collection #1 owned by FUSE', # collection owned by user
+ 'collection #2 owned by FUSE' # collection owned by user
+ ], fuse_user_objs)
+
+ # test_proj_files is a list of the files in the FUSE Test Project.
+ test_proj_files = llfuse.listdir(os.path.join(mounttmp, 'FUSE User', 'FUSE Test Project'))
+ test_proj_files.sort()
+ self.assertEqual(['collection in FUSE project'
+ ], test_proj_files)
+ Test().runTest()
+
class FuseSharedTest(MountTestBase):
def runTest(self):
self.make_mount(fuse.SharedDirectory,
exclude=self.api.users().current().execute()['uuid'])
+ keep = arvados.keep.KeepClient()
+ keep.put("baz".encode())
- # shared_dirs is a list of the directories exposed
- # by fuse.SharedDirectory (i.e. any object visible
- # to the current user)
- shared_dirs = llfuse.listdir(self.mounttmp)
- shared_dirs.sort()
- self.assertIn('FUSE User', shared_dirs)
-
- # fuse_user_objs is a list of the objects owned by the FUSE
- # test user (which present as files in the 'FUSE User'
- # directory)
- fuse_user_objs = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User'))
- fuse_user_objs.sort()
- self.assertEqual(['FUSE Test Project', # project owned by user
- 'collection #1 owned by FUSE', # collection owned by user
- 'collection #2 owned by FUSE', # collection owned by user
- 'pipeline instance owned by FUSE.pipelineInstance', # pipeline instance owned by user
- ], fuse_user_objs)
-
- # test_proj_files is a list of the files in the FUSE Test Project.
- test_proj_files = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User', 'FUSE Test Project'))
- test_proj_files.sort()
- self.assertEqual(['collection in FUSE project',
- 'pipeline instance in FUSE project.pipelineInstance',
- 'pipeline template in FUSE project.pipelineTemplate'
- ], test_proj_files)
-
- # Double check that we can open and read objects in this folder as a file,
- # and that its contents are what we expect.
- pipeline_template_path = os.path.join(
- self.mounttmp,
- 'FUSE User',
- 'FUSE Test Project',
- 'pipeline template in FUSE project.pipelineTemplate')
- with open(pipeline_template_path) as f:
- j = json.load(f)
- self.assertEqual("pipeline template in FUSE project", j['name'])
-
- # check mtime on template
- st = os.stat(pipeline_template_path)
- self.assertEqual(st.st_mtime, 1397493304)
-
- # check mtime on collection
- st = os.stat(os.path.join(
- self.mounttmp,
- 'FUSE User',
- 'collection #1 owned by FUSE'))
- self.assertEqual(st.st_mtime, 1391448174)
+ self.pool.apply(fuseSharedTestHelper, (self.mounttmp,))
class FuseHomeTest(MountTestBase):
'anonymously_accessible_project']
found_in = 0
found_not_in = 0
- for name, item in run_test_server.fixture('collections').iteritems():
+ for name, item in viewitems(run_test_server.fixture('collections')):
if 'name' not in item:
pass
elif item['owner_uuid'] == public_project['uuid']:
self.assertEqual(["file1.txt"], d1)
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
- self.assertRegexpMatches(collection2["manifest_text"],
+ assertRegex(self, collection2["manifest_text"],
r'\. d41d8cd98f00b204e9800998ecf8427e\+0\+A\S+ 0:0:file1\.txt$')
self.assertEqual(12, self.operations.read_counter.get())
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
- self.assertRegexpMatches(collection2["manifest_text"],
+ assertRegex(self, collection2["manifest_text"],
r'\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A\S+ 0:12:file1\.txt$')
self.pool.apply(fuseUpdateFileTestHelper, (self.mounttmp,))
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
- self.assertRegexpMatches(collection2["manifest_text"],
+ assertRegex(self, collection2["manifest_text"],
r'\. daaef200ebb921e011e3ae922dd3266b\+11\+A\S+ 86fb269d190d2c85f6e0468ceca42a20\+12\+A\S+ 0:11:file1\.txt 22:1:file1\.txt$')
self.pool.apply(fuseMkdirTestHelper, (self.mounttmp,))
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
- self.assertRegexpMatches(collection2["manifest_text"],
+ assertRegex(self, collection2["manifest_text"],
r'\./testdir 86fb269d190d2c85f6e0468ceca42a20\+12\+A\S+ 0:12:file1\.txt$')
# Starting manifest
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
- self.assertRegexpMatches(collection2["manifest_text"],
+ assertRegex(self, collection2["manifest_text"],
r'\./testdir 86fb269d190d2c85f6e0468ceca42a20\+12\+A\S+ 0:12:file1\.txt$')
self.pool.apply(fuseRmTestHelperDeleteFile, (self.mounttmp,))
- # Can't have empty directories :-( so manifest will be empty.
+ # Empty directories are represented by an empty file named "."
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
- self.assertEqual(collection2["manifest_text"], "")
+ assertRegex(self, collection2["manifest_text"],
+ r'./testdir d41d8cd98f00b204e9800998ecf8427e\+0\+A\S+ 0:0:\\056\n')
self.pool.apply(fuseRmTestHelperRmdir, (self.mounttmp,))
# Starting manifest
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
- self.assertRegexpMatches(collection2["manifest_text"],
+ assertRegex(self, collection2["manifest_text"],
r'\./testdir 86fb269d190d2c85f6e0468ceca42a20\+12\+A\S+ 0:12:file1\.txt$')
self.pool.apply(fuseMvFileTestHelperMoveFile, (self.mounttmp,))
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
- self.assertRegexpMatches(collection2["manifest_text"],
- r'\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A\S+ 0:12:file1\.txt$')
+ assertRegex(self, collection2["manifest_text"],
+ r'\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A\S+ 0:12:file1\.txt\n\./testdir d41d8cd98f00b204e9800998ecf8427e\+0\+A\S+ 0:0:\\056\n')
def fuseRenameTestHelper(mounttmp):
# Starting manifest
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
- self.assertRegexpMatches(collection2["manifest_text"],
+ assertRegex(self, collection2["manifest_text"],
r'\./testdir 86fb269d190d2c85f6e0468ceca42a20\+12\+A\S+ 0:12:file1\.txt$')
d1 = llfuse.listdir(os.path.join(self.mounttmp))
self.assertEqual(["file1.txt"], d1)
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
- self.assertRegexpMatches(collection2["manifest_text"],
+ assertRegex(self, collection2["manifest_text"],
r'\./testdir2 86fb269d190d2c85f6e0468ceca42a20\+12\+A\S+ 0:12:file1\.txt$')
with llfuse.lock:
m.new_collection(collection.api_response(), collection)
- self.operations.listen_for_events(self.api)
+ self.operations.listen_for_events()
d1 = llfuse.listdir(os.path.join(self.mounttmp))
self.assertEqual([], sorted(d1))
with collection2.open("file1.txt", "w") as f:
f.write("foo")
- time.sleep(1)
+ for attempt in AssertWithTimeout(10):
+ attempt(self.assertEqual, ["file1.txt"], llfuse.listdir(os.path.join(self.mounttmp)))
- # should show up via event bus notify
- d1 = llfuse.listdir(os.path.join(self.mounttmp))
- self.assertEqual(["file1.txt"], sorted(d1))
+class FuseDeleteProjectEventTest(MountTestBase):
+ def runTest(self):
+
+ aproject = self.api.groups().create(body={
+ "name": "aproject",
+ "group_class": "project"
+ }).execute()
+
+ bproject = self.api.groups().create(body={
+ "name": "bproject",
+ "group_class": "project",
+ "owner_uuid": aproject["uuid"]
+ }).execute()
+
+ self.make_mount(fuse.ProjectDirectory,
+ project_object=self.api.users().current().execute())
+
+ self.operations.listen_for_events()
+
+ d1 = llfuse.listdir(os.path.join(self.mounttmp, "aproject"))
+ self.assertEqual(["bproject"], sorted(d1))
+
+ self.api.groups().delete(uuid=bproject["uuid"]).execute()
+
+ for attempt in AssertWithTimeout(10):
+ attempt(self.assertEqual, [], llfuse.listdir(os.path.join(self.mounttmp, "aproject")))
def fuseFileConflictTestHelper(mounttmp):
with open(os.path.join(mounttmp, "file1.txt"), "r") as f:
self.assertEqual(f.read(), "bar")
- self.assertRegexpMatches(d1[1],
+ assertRegex(self, d1[1],
r'file1\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~')
with open(os.path.join(mounttmp, d1[1]), "r") as f:
collection1.update()
collection2.update()
- self.assertRegexpMatches(collection1.manifest_text(), r"\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A\S+ 0:12:file1\.txt$")
+ assertRegex(self, collection1.manifest_text(), r"\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A\S+ 0:12:file1\.txt$")
self.assertEqual(collection2.manifest_text(), "")
self.pool.apply(fuseMvFileBetweenCollectionsTest2, (self.mounttmp,
collection2.update()
self.assertEqual(collection1.manifest_text(), "")
- self.assertRegexpMatches(collection2.manifest_text(), r"\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A\S+ 0:12:file2\.txt$")
+ assertRegex(self, collection2.manifest_text(), r"\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A\S+ 0:12:file2\.txt$")
collection1.stop_threads()
collection2.stop_threads()
collection1.update()
collection2.update()
- self.assertRegexpMatches(collection1.manifest_text(), r"\./testdir 86fb269d190d2c85f6e0468ceca42a20\+12\+A\S+ 0:12:file1\.txt$")
+ assertRegex(self, collection1.manifest_text(), r"\./testdir 86fb269d190d2c85f6e0468ceca42a20\+12\+A\S+ 0:12:file1\.txt$")
self.assertEqual(collection2.manifest_text(), "")
self.pool.apply(fuseMvDirBetweenCollectionsTest2, (self.mounttmp,
collection2.update()
self.assertEqual(collection1.manifest_text(), "")
- self.assertRegexpMatches(collection2.manifest_text(), r"\./testdir2 86fb269d190d2c85f6e0468ceca42a20\+12\+A\S+ 0:12:file1\.txt$")
+ assertRegex(self, collection2.manifest_text(), r"\./testdir2 86fb269d190d2c85f6e0468ceca42a20\+12\+A\S+ 0:12:file1\.txt$")
collection1.stop_threads()
collection2.stop_threads()
def setUp(self):
api = mock.MagicMock()
super(MagicDirApiError, self).setUp(api=api)
- api.collections().get().execute.side_effect = iter([Exception('API fail'), {"manifest_text": self.test_manifest}])
+ api.collections().get().execute.side_effect = iter([
+ Exception('API fail'),
+ {
+ "manifest_text": self.test_manifest,
+ "portable_data_hash": self.test_manifest_pdh,
+ },
+ ])
api.keep.get.side_effect = Exception('Keep fail')
def runTest(self):
- self.make_mount(fuse.MagicDirectory)
+ with mock.patch('arvados_fuse.fresh.FreshBase._poll_time', new_callable=mock.PropertyMock, return_value=60) as mock_poll_time:
+ self.make_mount(fuse.MagicDirectory)
- self.operations.inodes.inode_cache.cap = 1
- self.operations.inodes.inode_cache.min_entries = 2
+ self.operations.inodes.inode_cache.cap = 1
+ self.operations.inodes.inode_cache.min_entries = 2
- with self.assertRaises(OSError):
- llfuse.listdir(os.path.join(self.mounttmp, self.testcollection))
+ with self.assertRaises(OSError):
+ llfuse.listdir(os.path.join(self.mounttmp, self.testcollection))
- llfuse.listdir(os.path.join(self.mounttmp, self.testcollection))
+ llfuse.listdir(os.path.join(self.mounttmp, self.testcollection))
-class FuseUnitTest(unittest.TestCase):
+class SanitizeFilenameTest(MountTestBase):
def test_sanitize_filename(self):
+ pdir = fuse.ProjectDirectory(1, {}, self.api, 0, project_object=self.api.users().current().execute())
acceptable = [
"foo.txt",
".foo",
"//",
]
for f in acceptable:
- self.assertEqual(f, fuse.sanitize_filename(f))
+ self.assertEqual(f, pdir.sanitize_filename(f))
for f in unacceptable:
- self.assertNotEqual(f, fuse.sanitize_filename(f))
+ self.assertNotEqual(f, pdir.sanitize_filename(f))
# The sanitized filename should be the same length, though.
- self.assertEqual(len(f), len(fuse.sanitize_filename(f)))
+ self.assertEqual(len(f), len(pdir.sanitize_filename(f)))
# Special cases
- self.assertEqual("_", fuse.sanitize_filename(""))
- self.assertEqual("_", fuse.sanitize_filename("."))
- self.assertEqual("__", fuse.sanitize_filename(".."))
+ self.assertEqual("_", pdir.sanitize_filename(""))
+ self.assertEqual("_", pdir.sanitize_filename("."))
+ self.assertEqual("__", pdir.sanitize_filename(".."))
class FuseMagicTestPDHOnly(MountTestBase):
files = {}
files[os.path.join(self.mounttmp, self.testcollection, 'thing1.txt')] = 'data 1'
- for k, v in files.items():
- with open(os.path.join(self.mounttmp, k)) as f:
- self.assertEqual(v, f.read())
+ for k, v in viewitems(files):
+ with open(os.path.join(self.mounttmp, k), 'rb') as f:
+ self.assertEqual(v, f.read().decode())
# look up using uuid should fail when pdh_only is set
if pdh_only is True:
def test_with_default_by_id(self):
self.verify_pdh_only(skip_pdh_only=True)
+
+
+class SlashSubstitutionTest(IntegrationTest):
+ mnt_args = [
+ '--read-write',
+ '--mount-home', 'zzz',
+ ]
+
+ def setUp(self):
+ super(SlashSubstitutionTest, self).setUp()
+ self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
+ self.api.config = lambda: {"Collections": {"ForwardSlashNameSubstitution": "[SLASH]"}}
+ self.testcoll = self.api.collections().create(body={"name": "foo/bar/baz"}).execute()
+ self.testcolleasy = self.api.collections().create(body={"name": "foo-bar-baz"}).execute()
+ self.fusename = 'foo[SLASH]bar[SLASH]baz'
+
+ @IntegrationTest.mount(argv=mnt_args)
+ @mock.patch('arvados.util.get_config_once')
+ def test_slash_substitution_before_listing(self, get_config_once):
+ get_config_once.return_value = {"Collections": {"ForwardSlashNameSubstitution": "[SLASH]"}}
+ self.pool_test(os.path.join(self.mnt, 'zzz'), self.fusename)
+ self.checkContents()
+ @staticmethod
+ def _test_slash_substitution_before_listing(self, tmpdir, fusename):
+ with open(os.path.join(tmpdir, 'foo-bar-baz', 'waz'), 'w') as f:
+ f.write('xxx')
+ with open(os.path.join(tmpdir, fusename, 'waz'), 'w') as f:
+ f.write('foo')
+
+ @IntegrationTest.mount(argv=mnt_args)
+ @mock.patch('arvados.util.get_config_once')
+ def test_slash_substitution_after_listing(self, get_config_once):
+ get_config_once.return_value = {"Collections": {"ForwardSlashNameSubstitution": "[SLASH]"}}
+ self.pool_test(os.path.join(self.mnt, 'zzz'), self.fusename)
+ self.checkContents()
+ @staticmethod
+ def _test_slash_substitution_after_listing(self, tmpdir, fusename):
+ with open(os.path.join(tmpdir, 'foo-bar-baz', 'waz'), 'w') as f:
+ f.write('xxx')
+ os.listdir(tmpdir)
+ with open(os.path.join(tmpdir, fusename, 'waz'), 'w') as f:
+ f.write('foo')
+
+ def checkContents(self):
+ self.assertRegexpMatches(self.api.collections().get(uuid=self.testcoll['uuid']).execute()['manifest_text'], ' acbd18db') # md5(foo)
+ self.assertRegexpMatches(self.api.collections().get(uuid=self.testcolleasy['uuid']).execute()['manifest_text'], ' f561aaf6') # md5(xxx)
+
+ @IntegrationTest.mount(argv=mnt_args)
+ @mock.patch('arvados.util.get_config_once')
+ def test_slash_substitution_conflict(self, get_config_once):
+ self.testcollconflict = self.api.collections().create(body={"name": self.fusename}).execute()
+ get_config_once.return_value = {"Collections": {"ForwardSlashNameSubstitution": "[SLASH]"}}
+ self.pool_test(os.path.join(self.mnt, 'zzz'), self.fusename)
+ self.assertRegexpMatches(self.api.collections().get(uuid=self.testcollconflict['uuid']).execute()['manifest_text'], ' acbd18db') # md5(foo)
+ # foo/bar/baz collection unchanged, because it is masked by foo[SLASH]bar[SLASH]baz
+ self.assertEqual(self.api.collections().get(uuid=self.testcoll['uuid']).execute()['manifest_text'], '')
+ @staticmethod
+ def _test_slash_substitution_conflict(self, tmpdir, fusename):
+ with open(os.path.join(tmpdir, fusename, 'waz'), 'w') as f:
+ f.write('foo')