3 import arvados_fuse as fuse
12 import run_test_server
15 class MountTestBase(unittest.TestCase):
17 self.keeptmp = tempfile.mkdtemp()
18 os.environ['KEEP_LOCAL_STORE'] = self.keeptmp
19 self.mounttmp = tempfile.mkdtemp()
20 run_test_server.run(False)
21 run_test_server.authorize_with("admin")
22 self.api = api = fuse.SafeApi(arvados.config)
25 run_test_server.stop()
27 # llfuse.close is buggy, so use fusermount instead.
28 #llfuse.close(unmount=True)
31 while (count < 9 and success != 0):
32 success = subprocess.call(["fusermount", "-u", self.mounttmp])
36 os.rmdir(self.mounttmp)
37 shutil.rmtree(self.keeptmp)
40 class FuseMountTest(MountTestBase):
42 super(FuseMountTest, self).setUp()
44 cw = arvados.CollectionWriter()
46 cw.start_new_file('thing1.txt')
48 cw.start_new_file('thing2.txt')
50 cw.start_new_stream('dir1')
52 cw.start_new_file('thing3.txt')
54 cw.start_new_file('thing4.txt')
57 cw.start_new_stream('dir2')
58 cw.start_new_file('thing5.txt')
60 cw.start_new_file('thing6.txt')
63 cw.start_new_stream('dir2/dir3')
64 cw.start_new_file('thing7.txt')
67 cw.start_new_file('thing8.txt')
70 cw.start_new_stream('edgecases')
71 for f in ":/./../.../-/*/\x01\\/ ".split("/"):
75 for f in ":/../.../-/*/\x01\\/ ".split("/"):
76 cw.start_new_stream('edgecases/dirs/' + f)
77 cw.start_new_file('x/x')
80 self._utf8 = ["\xe2\x9c\x8c", # victory sign
81 "\xe2\x9b\xb5", # sailboat
82 # "\xf0\x9f\x98\xb1", # scream. doesn't work!!
84 cw.start_new_stream('edgecases/utf8')
89 self.testcollection = cw.finish()
90 self.api.collections().create(body={"manifest_text":cw.manifest_text()}).execute()
92 def assertDirContents(self, subdir, expect_content):
95 path = os.path.join(path, subdir)
96 self.assertEqual(sorted([fn.decode('utf-8') for fn in expect_content]),
97 sorted([fn.decode('utf-8') for fn in os.listdir(path)]))
100 # Create the request handler
101 operations = fuse.Operations(os.getuid(), os.getgid())
102 e = operations.inodes.add_entry(fuse.CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, self.testcollection))
104 llfuse.init(operations, self.mounttmp, [])
105 t = threading.Thread(None, lambda: llfuse.main())
108 # wait until the driver is finished initializing
109 operations.initlock.wait()
111 # now check some stuff
112 self.assertDirContents(None, ['thing1.txt', 'thing2.txt',
113 'edgecases', 'dir1', 'dir2'])
114 self.assertDirContents('dir1', ['thing3.txt', 'thing4.txt'])
115 self.assertDirContents('dir2', ['thing5.txt', 'thing6.txt', 'dir3'])
116 self.assertDirContents('dir2/dir3', ['thing7.txt', 'thing8.txt'])
117 self.assertDirContents('edgecases',
118 "dirs/utf8/:/_/__/.../-/*/\x01\\/ ".split("/"))
119 self.assertDirContents('edgecases/dirs',
120 ":/__/.../-/*/\x01\\/ ".split("/"))
121 self.assertDirContents('edgecases/utf8', self._utf8)
123 files = {'thing1.txt': 'data 1',
124 'thing2.txt': 'data 2',
125 'dir1/thing3.txt': 'data 3',
126 'dir1/thing4.txt': 'data 4',
127 'dir2/thing5.txt': 'data 5',
128 'dir2/thing6.txt': 'data 6',
129 'dir2/dir3/thing7.txt': 'data 7',
130 'dir2/dir3/thing8.txt': 'data 8'}
132 for k, v in files.items():
133 with open(os.path.join(self.mounttmp, k)) as f:
134 self.assertEqual(v, f.read())
137 class FuseMagicTest(MountTestBase):
139 super(FuseMagicTest, self).setUp()
141 cw = arvados.CollectionWriter()
143 cw.start_new_file('thing1.txt')
146 self.testcollection = cw.finish()
147 self.api.collections().create(body={"manifest_text":cw.manifest_text()}).execute()
150 # Create the request handler
151 operations = fuse.Operations(os.getuid(), os.getgid())
152 e = operations.inodes.add_entry(fuse.MagicDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0))
154 self.mounttmp = tempfile.mkdtemp()
156 llfuse.init(operations, self.mounttmp, [])
157 t = threading.Thread(None, lambda: llfuse.main())
160 # wait until the driver is finished initializing
161 operations.initlock.wait()
163 # now check some stuff
164 d1 = os.listdir(self.mounttmp)
166 self.assertEqual(['README'], d1)
168 d2 = os.listdir(os.path.join(self.mounttmp, self.testcollection))
170 self.assertEqual(['thing1.txt'], d2)
172 d3 = os.listdir(self.mounttmp)
174 self.assertEqual([self.testcollection, 'README'], d3)
177 files[os.path.join(self.mounttmp, self.testcollection, 'thing1.txt')] = 'data 1'
179 for k, v in files.items():
180 with open(os.path.join(self.mounttmp, k)) as f:
181 self.assertEqual(v, f.read())
184 class FuseTagsTest(MountTestBase):
186 operations = fuse.Operations(os.getuid(), os.getgid())
187 e = operations.inodes.add_entry(fuse.TagsDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0))
189 llfuse.init(operations, self.mounttmp, [])
190 t = threading.Thread(None, lambda: llfuse.main())
193 # wait until the driver is finished initializing
194 operations.initlock.wait()
196 d1 = os.listdir(self.mounttmp)
198 self.assertEqual(['foo_tag'], d1)
200 d2 = os.listdir(os.path.join(self.mounttmp, 'foo_tag'))
202 self.assertEqual(['zzzzz-4zz18-fy296fx3hot09f7'], d2)
204 d3 = os.listdir(os.path.join(self.mounttmp, 'foo_tag', 'zzzzz-4zz18-fy296fx3hot09f7'))
206 self.assertEqual(['foo'], d3)
209 class FuseTagsUpdateTest(MountTestBase):
210 def runRealTest(self):
211 operations = fuse.Operations(os.getuid(), os.getgid())
212 e = operations.inodes.add_entry(fuse.TagsDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, poll_time=1))
214 llfuse.init(operations, self.mounttmp, [])
215 t = threading.Thread(None, lambda: llfuse.main())
218 # wait until the driver is finished initializing
219 operations.initlock.wait()
221 d1 = os.listdir(self.mounttmp)
223 self.assertEqual(['foo_tag'], d1)
225 self.api.links().create(body={'link': {
226 'head_uuid': 'fa7aeb5140e2848d39b416daeef4ffc5+45',
233 d2 = os.listdir(self.mounttmp)
235 self.assertEqual(['bar_tag', 'foo_tag'], d2)
237 d3 = os.listdir(os.path.join(self.mounttmp, 'bar_tag'))
239 self.assertEqual(['fa7aeb5140e2848d39b416daeef4ffc5+45'], d3)
241 l = self.api.links().create(body={'link': {
242 'head_uuid': 'ea10d51bcf88862dbcc36eb292017dfd+45',
249 d4 = os.listdir(os.path.join(self.mounttmp, 'bar_tag'))
251 self.assertEqual(['ea10d51bcf88862dbcc36eb292017dfd+45', 'fa7aeb5140e2848d39b416daeef4ffc5+45'], d4)
253 self.api.links().delete(uuid=l['uuid']).execute()
257 d5 = os.listdir(os.path.join(self.mounttmp, 'bar_tag'))
259 self.assertEqual(['fa7aeb5140e2848d39b416daeef4ffc5+45'], d5)
262 class FuseSharedTest(MountTestBase):
264 operations = fuse.Operations(os.getuid(), os.getgid())
265 e = operations.inodes.add_entry(fuse.SharedDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, self.api.users().current().execute()['uuid']))
267 llfuse.init(operations, self.mounttmp, [])
268 t = threading.Thread(None, lambda: llfuse.main())
271 # wait until the driver is finished initializing
272 operations.initlock.wait()
274 # shared_dirs is a list of the directories exposed
275 # by fuse.SharedDirectory (i.e. any object visible
276 # to the current user)
277 shared_dirs = os.listdir(self.mounttmp)
279 self.assertIn('FUSE User', shared_dirs)
281 # fuse_user_objs is a list of the objects owned by the FUSE
282 # test user (which present as files in the 'FUSE User'
284 fuse_user_objs = os.listdir(os.path.join(self.mounttmp, 'FUSE User'))
285 fuse_user_objs.sort()
286 self.assertEqual(['Empty collection.link', # permission link on collection
287 'FUSE Test Project', # project owned by user
288 'collection #1 owned by FUSE', # collection owned by user
289 'collection #2 owned by FUSE', # collection owned by user
290 'pipeline instance owned by FUSE.pipelineInstance', # pipeline instance owned by user
293 # test_proj_files is a list of the files in the FUSE Test Project.
294 test_proj_files = os.listdir(os.path.join(self.mounttmp, 'FUSE User', 'FUSE Test Project'))
295 test_proj_files.sort()
296 self.assertEqual(['collection in FUSE project',
297 'pipeline instance in FUSE project.pipelineInstance',
298 'pipeline template in FUSE project.pipelineTemplate'
301 # Double check that we can open and read objects in this folder as a file,
302 # and that its contents are what we expect.
303 with open(os.path.join(
307 'pipeline template in FUSE project.pipelineTemplate')) as f:
309 self.assertEqual("pipeline template in FUSE project", j['name'])
312 class FuseHomeTest(MountTestBase):
314 operations = fuse.Operations(os.getuid(), os.getgid())
315 e = operations.inodes.add_entry(fuse.ProjectDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, self.api.users().current().execute()))
317 llfuse.init(operations, self.mounttmp, [])
318 t = threading.Thread(None, lambda: llfuse.main())
321 # wait until the driver is finished initializing
322 operations.initlock.wait()
324 d1 = os.listdir(self.mounttmp)
326 self.assertIn('Unrestricted public data', d1)
328 d2 = os.listdir(os.path.join(self.mounttmp, 'Unrestricted public data'))
330 self.assertEqual(['GNU General Public License, version 3'], d2)
332 d3 = os.listdir(os.path.join(self.mounttmp, 'Unrestricted public data', 'GNU General Public License, version 3'))
334 self.assertEqual(["GNU_General_Public_License,_version_3.pdf"], d3)
337 class FuseUnitTest(unittest.TestCase):
338 def test_sanitize_filename(self):
358 self.assertEqual(f, fuse.sanitize_filename(f))
359 for f in unacceptable:
360 self.assertNotEqual(f, fuse.sanitize_filename(f))
361 # The sanitized filename should be the same length, though.
362 self.assertEqual(len(f), len(fuse.sanitize_filename(f)))
364 self.assertEqual("_", fuse.sanitize_filename(""))
365 self.assertEqual("_", fuse.sanitize_filename("."))
366 self.assertEqual("__", fuse.sanitize_filename(".."))
367 self.assertEqual("__", fuse.sanitize_filename(".."))