3 import arvados_fuse as fuse
12 import run_test_server
15 class MountTestBase(unittest.TestCase):
17 self.keeptmp = tempfile.mkdtemp()
18 os.environ['KEEP_LOCAL_STORE'] = self.keeptmp
19 self.mounttmp = tempfile.mkdtemp()
20 run_test_server.run(False)
21 run_test_server.authorize_with("admin")
22 self.api = api = fuse.SafeApi(arvados.config)
24 def make_mount(self, root_class, *root_args):
25 operations = fuse.Operations(os.getuid(), os.getgid())
26 operations.inodes.add_entry(root_class(
27 llfuse.ROOT_INODE, operations.inodes, self.api, 0, *root_args))
28 llfuse.init(operations, self.mounttmp, [])
29 threading.Thread(None, llfuse.main).start()
30 # wait until the driver is finished initializing
31 operations.initlock.wait()
34 run_test_server.stop()
36 # llfuse.close is buggy, so use fusermount instead.
37 #llfuse.close(unmount=True)
40 while (count < 9 and success != 0):
41 success = subprocess.call(["fusermount", "-u", self.mounttmp])
45 os.rmdir(self.mounttmp)
46 shutil.rmtree(self.keeptmp)
48 def assertDirContents(self, subdir, expect_content):
51 path = os.path.join(path, subdir)
52 self.assertEqual(sorted(expect_content), sorted(os.listdir(path)))
55 class FuseMountTest(MountTestBase):
57 super(FuseMountTest, self).setUp()
59 cw = arvados.CollectionWriter()
61 cw.start_new_file('thing1.txt')
63 cw.start_new_file('thing2.txt')
65 cw.start_new_stream('dir1')
67 cw.start_new_file('thing3.txt')
69 cw.start_new_file('thing4.txt')
72 cw.start_new_stream('dir2')
73 cw.start_new_file('thing5.txt')
75 cw.start_new_file('thing6.txt')
78 cw.start_new_stream('dir2/dir3')
79 cw.start_new_file('thing7.txt')
82 cw.start_new_file('thing8.txt')
85 cw.start_new_stream('edgecases')
86 for f in ":/./../.../-/*/\x01\\/ ".split("/"):
90 for f in ":/../.../-/*/\x01\\/ ".split("/"):
91 cw.start_new_stream('edgecases/dirs/' + f)
92 cw.start_new_file('x/x')
95 self.testcollection = cw.finish()
96 self.api.collections().create(body={"manifest_text":cw.manifest_text()}).execute()
99 self.make_mount(fuse.CollectionDirectory, self.testcollection)
101 self.assertDirContents(None, ['thing1.txt', 'thing2.txt',
102 'edgecases', 'dir1', 'dir2'])
103 self.assertDirContents('dir1', ['thing3.txt', 'thing4.txt'])
104 self.assertDirContents('dir2', ['thing5.txt', 'thing6.txt', 'dir3'])
105 self.assertDirContents('dir2/dir3', ['thing7.txt', 'thing8.txt'])
106 self.assertDirContents('edgecases',
107 "dirs/:/_/__/.../-/*/\x01\\/ ".split("/"))
108 self.assertDirContents('edgecases/dirs',
109 ":/__/.../-/*/\x01\\/ ".split("/"))
111 files = {'thing1.txt': 'data 1',
112 'thing2.txt': 'data 2',
113 'dir1/thing3.txt': 'data 3',
114 'dir1/thing4.txt': 'data 4',
115 'dir2/thing5.txt': 'data 5',
116 'dir2/thing6.txt': 'data 6',
117 'dir2/dir3/thing7.txt': 'data 7',
118 'dir2/dir3/thing8.txt': 'data 8'}
120 for k, v in files.items():
121 with open(os.path.join(self.mounttmp, k)) as f:
122 self.assertEqual(v, f.read())
125 class FuseMagicTest(MountTestBase):
127 super(FuseMagicTest, self).setUp()
129 cw = arvados.CollectionWriter()
131 cw.start_new_file('thing1.txt')
134 self.testcollection = cw.finish()
135 self.api.collections().create(body={"manifest_text":cw.manifest_text()}).execute()
138 self.make_mount(fuse.MagicDirectory)
140 mount_ls = os.listdir(self.mounttmp)
141 self.assertIn('README', mount_ls)
142 self.assertFalse(any(arvados.util.keep_locator_pattern.match(fn) or
143 arvados.util.uuid_pattern.match(fn)
145 "new FUSE MagicDirectory lists Collection")
146 self.assertDirContents(self.testcollection, ['thing1.txt'])
147 self.assertDirContents(os.path.join('by_id', self.testcollection),
149 mount_ls = os.listdir(self.mounttmp)
150 self.assertIn('README', mount_ls)
151 self.assertIn(self.testcollection, mount_ls)
152 self.assertIn(self.testcollection,
153 os.listdir(os.path.join(self.mounttmp, 'by_id')))
156 files[os.path.join(self.mounttmp, self.testcollection, 'thing1.txt')] = 'data 1'
158 for k, v in files.items():
159 with open(os.path.join(self.mounttmp, k)) as f:
160 self.assertEqual(v, f.read())
163 class FuseTagsTest(MountTestBase):
165 self.make_mount(fuse.TagsDirectory)
167 d1 = os.listdir(self.mounttmp)
169 self.assertEqual(['foo_tag'], d1)
171 d2 = os.listdir(os.path.join(self.mounttmp, 'foo_tag'))
173 self.assertEqual(['zzzzz-4zz18-fy296fx3hot09f7'], d2)
175 d3 = os.listdir(os.path.join(self.mounttmp, 'foo_tag', 'zzzzz-4zz18-fy296fx3hot09f7'))
177 self.assertEqual(['foo'], d3)
180 class FuseTagsUpdateTest(MountTestBase):
181 def tag_collection(self, coll_uuid, tag_name):
182 return self.api.links().create(
183 body={'link': {'head_uuid': coll_uuid,
189 operations = fuse.Operations(os.getuid(), os.getgid())
190 e = operations.inodes.add_entry(fuse.TagsDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, poll_time=1))
192 llfuse.init(operations, self.mounttmp, [])
193 t = threading.Thread(None, lambda: llfuse.main())
196 # wait until the driver is finished initializing
197 operations.initlock.wait()
198 self.assertIn('foo_tag', os.listdir(self.mounttmp))
200 bar_uuid = run_test_server.fixture('collections')['bar_file']['uuid']
201 self.tag_collection(bar_uuid, 'fuse_test_tag')
203 self.assertIn('fuse_test_tag', os.listdir(self.mounttmp))
204 self.assertDirContents('fuse_test_tag', [bar_uuid])
206 baz_uuid = run_test_server.fixture('collections')['baz_file']['uuid']
207 l = self.tag_collection(baz_uuid, 'fuse_test_tag')
209 self.assertDirContents('fuse_test_tag', [bar_uuid, baz_uuid])
211 self.api.links().delete(uuid=l['uuid']).execute()
213 self.assertDirContents('fuse_test_tag', [bar_uuid])
216 class FuseSharedTest(MountTestBase):
218 self.make_mount(fuse.SharedDirectory,
219 self.api.users().current().execute()['uuid'])
221 # shared_dirs is a list of the directories exposed
222 # by fuse.SharedDirectory (i.e. any object visible
223 # to the current user)
224 shared_dirs = os.listdir(self.mounttmp)
226 self.assertIn('FUSE User', shared_dirs)
228 # fuse_user_objs is a list of the objects owned by the FUSE
229 # test user (which present as files in the 'FUSE User'
231 fuse_user_objs = os.listdir(os.path.join(self.mounttmp, 'FUSE User'))
232 fuse_user_objs.sort()
233 self.assertEqual(['Empty collection.link', # permission link on collection
234 'FUSE Test Project', # project owned by user
235 'collection #1 owned by FUSE', # collection owned by user
236 'collection #2 owned by FUSE', # collection owned by user
237 'pipeline instance owned by FUSE.pipelineInstance', # pipeline instance owned by user
240 # test_proj_files is a list of the files in the FUSE Test Project.
241 test_proj_files = os.listdir(os.path.join(self.mounttmp, 'FUSE User', 'FUSE Test Project'))
242 test_proj_files.sort()
243 self.assertEqual(['collection in FUSE project',
244 'pipeline instance in FUSE project.pipelineInstance',
245 'pipeline template in FUSE project.pipelineTemplate'
248 # Double check that we can open and read objects in this folder as a file,
249 # and that its contents are what we expect.
250 with open(os.path.join(
254 'pipeline template in FUSE project.pipelineTemplate')) as f:
256 self.assertEqual("pipeline template in FUSE project", j['name'])
259 class FuseHomeTest(MountTestBase):
261 self.make_mount(fuse.ProjectDirectory,
262 self.api.users().current().execute())
264 d1 = os.listdir(self.mounttmp)
265 self.assertIn('Unrestricted public data', d1)
267 d2 = os.listdir(os.path.join(self.mounttmp, 'Unrestricted public data'))
268 self.assertEqual(['GNU General Public License, version 3'], d2)
270 d3 = os.listdir(os.path.join(self.mounttmp, 'Unrestricted public data', 'GNU General Public License, version 3'))
271 self.assertEqual(["GNU_General_Public_License,_version_3.pdf"], d3)
274 class FuseUnitTest(unittest.TestCase):
275 def test_sanitize_filename(self):
295 self.assertEqual(f, fuse.sanitize_filename(f))
296 for f in unacceptable:
297 self.assertNotEqual(f, fuse.sanitize_filename(f))
298 # The sanitized filename should be the same length, though.
299 self.assertEqual(len(f), len(fuse.sanitize_filename(f)))
301 self.assertEqual("_", fuse.sanitize_filename(""))
302 self.assertEqual("_", fuse.sanitize_filename("."))
303 self.assertEqual("__", fuse.sanitize_filename(".."))