run_test_server.authorize_with("admin")
self.api = api = fuse.SafeApi(arvados.config)
- def make_mount(self, root_class, *root_args):
+ def make_mount(self, root_class, **root_kwargs):
operations = fuse.Operations(os.getuid(), os.getgid())
operations.inodes.add_entry(root_class(
- llfuse.ROOT_INODE, operations.inodes, self.api, 0, *root_args))
+ llfuse.ROOT_INODE, operations.inodes, self.api, 0, **root_kwargs))
llfuse.init(operations, self.mounttmp, [])
threading.Thread(None, llfuse.main).start()
# wait until the driver is finished initializing
self.api.collections().create(body={"manifest_text":cw.manifest_text()}).execute()
def runTest(self):
- self.make_mount(fuse.CollectionDirectory, self.testcollection)
+ self.make_mount(fuse.CollectionDirectory, collection=self.testcollection)
self.assertDirContents(None, ['thing1.txt', 'thing2.txt',
'edgecases', 'dir1', 'dir2'])
}}).execute()
def runTest(self):
- operations = fuse.Operations(os.getuid(), os.getgid())
- e = operations.inodes.add_entry(fuse.TagsDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, poll_time=1))
-
- llfuse.init(operations, self.mounttmp, [])
- t = threading.Thread(None, lambda: llfuse.main())
- t.start()
+ self.make_mount(fuse.TagsDirectory, poll_time=1)
- # wait until the driver is finished initializing
- operations.initlock.wait()
self.assertIn('foo_tag', os.listdir(self.mounttmp))
bar_uuid = run_test_server.fixture('collections')['bar_file']['uuid']
class FuseSharedTest(MountTestBase):
def runTest(self):
self.make_mount(fuse.SharedDirectory,
- self.api.users().current().execute()['uuid'])
+ exclude=self.api.users().current().execute()['uuid'])
# shared_dirs is a list of the directories exposed
# by fuse.SharedDirectory (i.e. any object visible
class FuseHomeTest(MountTestBase):
def runTest(self):
self.make_mount(fuse.ProjectDirectory,
- self.api.users().current().execute())
+ project_object=self.api.users().current().execute())
d1 = os.listdir(self.mounttmp)
self.assertIn('Unrestricted public data', d1)