3 import arvados_fuse as fuse
12 import run_test_server
15 class MountTestBase(unittest.TestCase):
17 self.keeptmp = tempfile.mkdtemp()
18 os.environ['KEEP_LOCAL_STORE'] = self.keeptmp
19 self.mounttmp = tempfile.mkdtemp()
20 run_test_server.run(False)
21 run_test_server.authorize_with("admin")
22 self.api = api = fuse.SafeApi(arvados.config)
25 run_test_server.stop()
27 # llfuse.close is buggy, so use fusermount instead.
28 #llfuse.close(unmount=True)
31 while (count < 9 and success != 0):
32 success = subprocess.call(["fusermount", "-u", self.mounttmp])
36 os.rmdir(self.mounttmp)
37 shutil.rmtree(self.keeptmp)
39 def assertDirContents(self, subdir, expect_content):
42 path = os.path.join(path, subdir)
43 self.assertEqual(sorted(expect_content), sorted(os.listdir(path)))
46 class FuseMountTest(MountTestBase):
48 super(FuseMountTest, self).setUp()
50 cw = arvados.CollectionWriter()
52 cw.start_new_file('thing1.txt')
54 cw.start_new_file('thing2.txt')
56 cw.start_new_stream('dir1')
58 cw.start_new_file('thing3.txt')
60 cw.start_new_file('thing4.txt')
63 cw.start_new_stream('dir2')
64 cw.start_new_file('thing5.txt')
66 cw.start_new_file('thing6.txt')
69 cw.start_new_stream('dir2/dir3')
70 cw.start_new_file('thing7.txt')
73 cw.start_new_file('thing8.txt')
76 cw.start_new_stream('edgecases')
77 for f in ":/./../.../-/*/\x01\\/ ".split("/"):
81 for f in ":/../.../-/*/\x01\\/ ".split("/"):
82 cw.start_new_stream('edgecases/dirs/' + f)
83 cw.start_new_file('x/x')
86 self.testcollection = cw.finish()
87 self.api.collections().create(body={"manifest_text":cw.manifest_text()}).execute()
90 # Create the request handler
91 operations = fuse.Operations(os.getuid(), os.getgid())
92 e = operations.inodes.add_entry(fuse.CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, self.testcollection))
94 llfuse.init(operations, self.mounttmp, [])
95 t = threading.Thread(None, lambda: llfuse.main())
98 # wait until the driver is finished initializing
99 operations.initlock.wait()
101 # now check some stuff
102 self.assertDirContents(None, ['thing1.txt', 'thing2.txt',
103 'edgecases', 'dir1', 'dir2'])
104 self.assertDirContents('dir1', ['thing3.txt', 'thing4.txt'])
105 self.assertDirContents('dir2', ['thing5.txt', 'thing6.txt', 'dir3'])
106 self.assertDirContents('dir2/dir3', ['thing7.txt', 'thing8.txt'])
107 self.assertDirContents('edgecases',
108 "dirs/:/_/__/.../-/*/\x01\\/ ".split("/"))
109 self.assertDirContents('edgecases/dirs',
110 ":/__/.../-/*/\x01\\/ ".split("/"))
112 files = {'thing1.txt': 'data 1',
113 'thing2.txt': 'data 2',
114 'dir1/thing3.txt': 'data 3',
115 'dir1/thing4.txt': 'data 4',
116 'dir2/thing5.txt': 'data 5',
117 'dir2/thing6.txt': 'data 6',
118 'dir2/dir3/thing7.txt': 'data 7',
119 'dir2/dir3/thing8.txt': 'data 8'}
121 for k, v in files.items():
122 with open(os.path.join(self.mounttmp, k)) as f:
123 self.assertEqual(v, f.read())
126 class FuseMagicTest(MountTestBase):
128 super(FuseMagicTest, self).setUp()
130 cw = arvados.CollectionWriter()
132 cw.start_new_file('thing1.txt')
135 self.testcollection = cw.finish()
136 self.api.collections().create(body={"manifest_text":cw.manifest_text()}).execute()
139 # Create the request handler
140 operations = fuse.Operations(os.getuid(), os.getgid())
141 e = operations.inodes.add_entry(fuse.MagicDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0))
143 llfuse.init(operations, self.mounttmp, [])
144 t = threading.Thread(None, lambda: llfuse.main())
147 # wait until the driver is finished initializing
148 operations.initlock.wait()
150 # now check some stuff
151 mount_ls = os.listdir(self.mounttmp)
152 self.assertIn('README', mount_ls)
153 self.assertFalse(any(arvados.util.keep_locator_pattern.match(fn) or
154 arvados.util.uuid_pattern.match(fn)
156 "new FUSE MagicDirectory lists Collection")
157 self.assertDirContents(self.testcollection, ['thing1.txt'])
158 self.assertDirContents(os.path.join('by_id', self.testcollection),
160 mount_ls = os.listdir(self.mounttmp)
161 self.assertIn('README', mount_ls)
162 self.assertIn(self.testcollection, mount_ls)
163 self.assertIn(self.testcollection,
164 os.listdir(os.path.join(self.mounttmp, 'by_id')))
167 files[os.path.join(self.mounttmp, self.testcollection, 'thing1.txt')] = 'data 1'
169 for k, v in files.items():
170 with open(os.path.join(self.mounttmp, k)) as f:
171 self.assertEqual(v, f.read())
174 class FuseTagsTest(MountTestBase):
176 operations = fuse.Operations(os.getuid(), os.getgid())
177 e = operations.inodes.add_entry(fuse.TagsDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0))
179 llfuse.init(operations, self.mounttmp, [])
180 t = threading.Thread(None, lambda: llfuse.main())
183 # wait until the driver is finished initializing
184 operations.initlock.wait()
186 d1 = os.listdir(self.mounttmp)
188 self.assertEqual(['foo_tag'], d1)
190 d2 = os.listdir(os.path.join(self.mounttmp, 'foo_tag'))
192 self.assertEqual(['zzzzz-4zz18-fy296fx3hot09f7'], d2)
194 d3 = os.listdir(os.path.join(self.mounttmp, 'foo_tag', 'zzzzz-4zz18-fy296fx3hot09f7'))
196 self.assertEqual(['foo'], d3)
199 class FuseTagsUpdateTest(MountTestBase):
200 def tag_collection(self, coll_uuid, tag_name):
201 return self.api.links().create(
202 body={'link': {'head_uuid': coll_uuid,
208 operations = fuse.Operations(os.getuid(), os.getgid())
209 e = operations.inodes.add_entry(fuse.TagsDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, poll_time=1))
211 llfuse.init(operations, self.mounttmp, [])
212 t = threading.Thread(None, lambda: llfuse.main())
215 # wait until the driver is finished initializing
216 operations.initlock.wait()
217 self.assertIn('foo_tag', os.listdir(self.mounttmp))
219 bar_uuid = run_test_server.fixture('collections')['bar_file']['uuid']
220 self.tag_collection(bar_uuid, 'fuse_test_tag')
222 self.assertIn('fuse_test_tag', os.listdir(self.mounttmp))
223 self.assertDirContents('fuse_test_tag', [bar_uuid])
225 baz_uuid = run_test_server.fixture('collections')['baz_file']['uuid']
226 l = self.tag_collection(baz_uuid, 'fuse_test_tag')
228 self.assertDirContents('fuse_test_tag', [bar_uuid, baz_uuid])
230 self.api.links().delete(uuid=l['uuid']).execute()
232 self.assertDirContents('fuse_test_tag', [bar_uuid])
235 class FuseSharedTest(MountTestBase):
237 operations = fuse.Operations(os.getuid(), os.getgid())
238 e = operations.inodes.add_entry(fuse.SharedDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, self.api.users().current().execute()['uuid']))
240 llfuse.init(operations, self.mounttmp, [])
241 t = threading.Thread(None, lambda: llfuse.main())
244 # wait until the driver is finished initializing
245 operations.initlock.wait()
247 # shared_dirs is a list of the directories exposed
248 # by fuse.SharedDirectory (i.e. any object visible
249 # to the current user)
250 shared_dirs = os.listdir(self.mounttmp)
252 self.assertIn('FUSE User', shared_dirs)
254 # fuse_user_objs is a list of the objects owned by the FUSE
255 # test user (which present as files in the 'FUSE User'
257 fuse_user_objs = os.listdir(os.path.join(self.mounttmp, 'FUSE User'))
258 fuse_user_objs.sort()
259 self.assertEqual(['Empty collection.link', # permission link on collection
260 'FUSE Test Project', # project owned by user
261 'collection #1 owned by FUSE', # collection owned by user
262 'collection #2 owned by FUSE', # collection owned by user
263 'pipeline instance owned by FUSE.pipelineInstance', # pipeline instance owned by user
266 # test_proj_files is a list of the files in the FUSE Test Project.
267 test_proj_files = os.listdir(os.path.join(self.mounttmp, 'FUSE User', 'FUSE Test Project'))
268 test_proj_files.sort()
269 self.assertEqual(['collection in FUSE project',
270 'pipeline instance in FUSE project.pipelineInstance',
271 'pipeline template in FUSE project.pipelineTemplate'
274 # Double check that we can open and read objects in this folder as a file,
275 # and that its contents are what we expect.
276 with open(os.path.join(
280 'pipeline template in FUSE project.pipelineTemplate')) as f:
282 self.assertEqual("pipeline template in FUSE project", j['name'])
285 class FuseHomeTest(MountTestBase):
287 operations = fuse.Operations(os.getuid(), os.getgid())
288 e = operations.inodes.add_entry(fuse.ProjectDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, self.api.users().current().execute()))
290 llfuse.init(operations, self.mounttmp, [])
291 t = threading.Thread(None, lambda: llfuse.main())
294 # wait until the driver is finished initializing
295 operations.initlock.wait()
297 d1 = os.listdir(self.mounttmp)
299 self.assertIn('Unrestricted public data', d1)
301 d2 = os.listdir(os.path.join(self.mounttmp, 'Unrestricted public data'))
303 self.assertEqual(['GNU General Public License, version 3'], d2)
305 d3 = os.listdir(os.path.join(self.mounttmp, 'Unrestricted public data', 'GNU General Public License, version 3'))
307 self.assertEqual(["GNU_General_Public_License,_version_3.pdf"], d3)
310 class FuseUnitTest(unittest.TestCase):
311 def test_sanitize_filename(self):
331 self.assertEqual(f, fuse.sanitize_filename(f))
332 for f in unacceptable:
333 self.assertNotEqual(f, fuse.sanitize_filename(f))
334 # The sanitized filename should be the same length, though.
335 self.assertEqual(len(f), len(fuse.sanitize_filename(f)))
337 self.assertEqual("_", fuse.sanitize_filename(""))
338 self.assertEqual("_", fuse.sanitize_filename("."))
339 self.assertEqual("__", fuse.sanitize_filename(".."))