import arvados
+import arvados.safeapi
import arvados_fuse as fuse
import glob
import json
self.mounttmp = tempfile.mkdtemp()
run_test_server.run()
run_test_server.authorize_with("admin")
- self.api = fuse.SafeApi(arvados.config)
+ self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
def make_mount(self, root_class, **root_kwargs):
- operations = fuse.Operations(os.getuid(), os.getgid())
+ operations = fuse.Operations(os.getuid(), os.getgid(), cache_cap=2)
operations.inodes.add_entry(root_class(
llfuse.ROOT_INODE, operations.inodes, self.api, 0, **root_kwargs))
llfuse.init(operations, self.mounttmp, [])
# directory)
fuse_user_objs = os.listdir(os.path.join(self.mounttmp, 'FUSE User'))
fuse_user_objs.sort()
- self.assertEqual(['Empty collection.link', # permission link on collection
- 'FUSE Test Project', # project owned by user
+ self.assertEqual(['FUSE Test Project', # project owned by user
'collection #1 owned by FUSE', # collection owned by user
'collection #2 owned by FUSE', # collection owned by user
'pipeline instance owned by FUSE.pipelineInstance', # pipeline instance owned by user
# Double check that we can open and read objects in this folder as a file,
# and that its contents are what we expect.
- with open(os.path.join(
+ pipeline_template_path = os.path.join(
self.mounttmp,
'FUSE User',
'FUSE Test Project',
- 'pipeline template in FUSE project.pipelineTemplate')) as f:
+ 'pipeline template in FUSE project.pipelineTemplate')
+ with open(pipeline_template_path) as f:
j = json.load(f)
self.assertEqual("pipeline template in FUSE project", j['name'])
+ # check mtime on template
+ st = os.stat(pipeline_template_path)
+ self.assertEqual(st.st_mtime, 1397493304)
+
+ # check mtime on collection
+ st = os.stat(os.path.join(
+ self.mounttmp,
+ 'FUSE User',
+ 'collection #1 owned by FUSE'))
+ self.assertEqual(st.st_mtime, 1391448174)
+
class FuseHomeTest(MountTestBase):
def runTest(self):