- operations = fuse.Operations(os.getuid(), os.getgid())
- e = operations.inodes.add_entry(fuse.SharedDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, self.api.users().current().execute()['uuid']))
-
- llfuse.init(operations, self.mounttmp, [])
- t = threading.Thread(None, lambda: llfuse.main())
- t.start()
-
- # wait until the driver is finished initializing
- operations.initlock.wait()
-
- d1 = os.listdir(self.mounttmp)
- d1.sort()
- self.assertIn('Active User', d1)
-
- d2 = os.listdir(os.path.join(self.mounttmp, 'Active User'))
- d2.sort()
- self.assertEqual(['A Project',
- "Empty collection",
- "Empty collection.link",
- "Pipeline Template with Input Parameter with Search.pipelineTemplate",
- "Pipeline Template with Jobspec Components.pipelineTemplate",
- "pipeline_with_job.pipelineInstance"
- ], d2)
-
- d3 = os.listdir(os.path.join(self.mounttmp, 'Active User', 'A Project'))
- d3.sort()
- self.assertEqual(["A Subproject",
- "Two Part Pipeline Template.pipelineTemplate",
- "collection_to_move_around",
- "zzzzz-4zz18-fy296fx3hot09f7 added sometime"
- ], d3)
-
- with open(os.path.join(self.mounttmp, 'Active User', "A Project", "Two Part Pipeline Template.pipelineTemplate")) as f:
+ self.make_mount(fuse.SharedDirectory,
+ self.api.users().current().execute()['uuid'])
+
+ # shared_dirs is a list of the directories exposed
+ # by fuse.SharedDirectory (i.e. any object visible
+ # to the current user)
+ shared_dirs = os.listdir(self.mounttmp)
+ shared_dirs.sort()
+ self.assertIn('FUSE User', shared_dirs)
+
+ # fuse_user_objs is a list of the objects owned by the FUSE
+ # test user (which present as files in the 'FUSE User'
+ # directory)
+ fuse_user_objs = os.listdir(os.path.join(self.mounttmp, 'FUSE User'))
+ fuse_user_objs.sort()
+ self.assertEqual(['Empty collection.link', # permission link on collection
+ 'FUSE Test Project', # project owned by user
+ 'collection #1 owned by FUSE', # collection owned by user
+ 'collection #2 owned by FUSE', # collection owned by user
+ 'pipeline instance owned by FUSE.pipelineInstance', # pipeline instance owned by user
+ ], fuse_user_objs)
+
+ # test_proj_files is a list of the files in the FUSE Test Project.
+ test_proj_files = os.listdir(os.path.join(self.mounttmp, 'FUSE User', 'FUSE Test Project'))
+ test_proj_files.sort()
+ self.assertEqual(['collection in FUSE project',
+ 'pipeline instance in FUSE project.pipelineInstance',
+ 'pipeline template in FUSE project.pipelineTemplate'
+ ], test_proj_files)
+
+ # Double check that we can open and read objects in this folder as a file,
+ # and that its contents are what we expect.
+ with open(os.path.join(
+ self.mounttmp,
+ 'FUSE User',
+ 'FUSE Test Project',
+ 'pipeline template in FUSE project.pipelineTemplate')) as f: