-import unittest
import arvados
+import arvados.safeapi
import arvados_fuse as fuse
-import threading
-import time
-import os
+import glob
+import json
import llfuse
-import tempfile
+import os
import shutil
import subprocess
-import glob
+import sys
+import tempfile
+import threading
+import time
+import unittest
+
import run_test_server
-import json
class MountTestBase(unittest.TestCase):
def setUp(self):
self.keeptmp = tempfile.mkdtemp()
os.environ['KEEP_LOCAL_STORE'] = self.keeptmp
self.mounttmp = tempfile.mkdtemp()
- run_test_server.run(False)
+ run_test_server.run()
run_test_server.authorize_with("admin")
- self.api = api = fuse.SafeApi(arvados.config)
+ self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
- def tearDown(self):
- run_test_server.stop()
+ def make_mount(self, root_class, **root_kwargs):
+ operations = fuse.Operations(os.getuid(), os.getgid())
+ operations.inodes.add_entry(root_class(
+ llfuse.ROOT_INODE, operations.inodes, self.api, 0, **root_kwargs))
+ llfuse.init(operations, self.mounttmp, [])
+ threading.Thread(None, llfuse.main).start()
+ # wait until the driver is finished initializing
+ operations.initlock.wait()
+ def tearDown(self):
# llfuse.close is buggy, so use fusermount instead.
#llfuse.close(unmount=True)
count = 0
os.rmdir(self.mounttmp)
shutil.rmtree(self.keeptmp)
+ run_test_server.reset()
def assertDirContents(self, subdir, expect_content):
path = self.mounttmp
self.api.collections().create(body={"manifest_text":cw.manifest_text()}).execute()
def runTest(self):
- # Create the request handler
- operations = fuse.Operations(os.getuid(), os.getgid())
- e = operations.inodes.add_entry(fuse.CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, self.testcollection))
-
- llfuse.init(operations, self.mounttmp, [])
- t = threading.Thread(None, lambda: llfuse.main())
- t.start()
-
- # wait until the driver is finished initializing
- operations.initlock.wait()
+ self.make_mount(fuse.CollectionDirectory, collection=self.testcollection)
- # now check some stuff
self.assertDirContents(None, ['thing1.txt', 'thing2.txt',
'edgecases', 'dir1', 'dir2'])
self.assertDirContents('dir1', ['thing3.txt', 'thing4.txt'])
self.assertEqual(v, f.read())
+class FuseNoAPITest(MountTestBase):
+ def setUp(self):
+ super(FuseNoAPITest, self).setUp()
+ keep = arvados.keep.KeepClient(local_store=self.keeptmp)
+ self.file_data = "API-free text\n"
+ self.file_loc = keep.put(self.file_data)
+ self.coll_loc = keep.put(". {} 0:{}:api-free.txt\n".format(
+ self.file_loc, len(self.file_data)))
+
+ def runTest(self):
+ self.make_mount(fuse.MagicDirectory)
+ self.assertDirContents(self.coll_loc, ['api-free.txt'])
+ with open(os.path.join(
+ self.mounttmp, self.coll_loc, 'api-free.txt')) as keep_file:
+ actual = keep_file.read(-1)
+ self.assertEqual(self.file_data, actual)
+
+
class FuseMagicTest(MountTestBase):
def setUp(self):
super(FuseMagicTest, self).setUp()
self.api.collections().create(body={"manifest_text":cw.manifest_text()}).execute()
def runTest(self):
- # Create the request handler
- operations = fuse.Operations(os.getuid(), os.getgid())
- e = operations.inodes.add_entry(fuse.MagicDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0))
-
- llfuse.init(operations, self.mounttmp, [])
- t = threading.Thread(None, lambda: llfuse.main())
- t.start()
+ self.make_mount(fuse.MagicDirectory)
- # wait until the driver is finished initializing
- operations.initlock.wait()
-
- # now check some stuff
mount_ls = os.listdir(self.mounttmp)
self.assertIn('README', mount_ls)
self.assertFalse(any(arvados.util.keep_locator_pattern.match(fn) or
class FuseTagsTest(MountTestBase):
def runTest(self):
- operations = fuse.Operations(os.getuid(), os.getgid())
- e = operations.inodes.add_entry(fuse.TagsDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0))
-
- llfuse.init(operations, self.mounttmp, [])
- t = threading.Thread(None, lambda: llfuse.main())
- t.start()
-
- # wait until the driver is finished initializing
- operations.initlock.wait()
+ self.make_mount(fuse.TagsDirectory)
d1 = os.listdir(self.mounttmp)
d1.sort()
}}).execute()
def runTest(self):
- operations = fuse.Operations(os.getuid(), os.getgid())
- e = operations.inodes.add_entry(fuse.TagsDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, poll_time=1))
+ self.make_mount(fuse.TagsDirectory, poll_time=1)
- llfuse.init(operations, self.mounttmp, [])
- t = threading.Thread(None, lambda: llfuse.main())
- t.start()
-
- # wait until the driver is finished initializing
- operations.initlock.wait()
self.assertIn('foo_tag', os.listdir(self.mounttmp))
bar_uuid = run_test_server.fixture('collections')['bar_file']['uuid']
class FuseSharedTest(MountTestBase):
def runTest(self):
- operations = fuse.Operations(os.getuid(), os.getgid())
- e = operations.inodes.add_entry(fuse.SharedDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, self.api.users().current().execute()['uuid']))
-
- llfuse.init(operations, self.mounttmp, [])
- t = threading.Thread(None, lambda: llfuse.main())
- t.start()
-
- # wait until the driver is finished initializing
- operations.initlock.wait()
+ self.make_mount(fuse.SharedDirectory,
+ exclude=self.api.users().current().execute()['uuid'])
# shared_dirs is a list of the directories exposed
# by fuse.SharedDirectory (i.e. any object visible
# Double check that we can open and read objects in this folder as a file,
# and that its contents are what we expect.
- with open(os.path.join(
+ pipeline_template_path = os.path.join(
self.mounttmp,
'FUSE User',
'FUSE Test Project',
- 'pipeline template in FUSE project.pipelineTemplate')) as f:
+ 'pipeline template in FUSE project.pipelineTemplate')
+ with open(pipeline_template_path) as f:
j = json.load(f)
self.assertEqual("pipeline template in FUSE project", j['name'])
+ # check mtime on template
+ st = os.stat(pipeline_template_path)
+ self.assertEqual(st.st_mtime, 1397493304)
-class FuseHomeTest(MountTestBase):
- def runTest(self):
- operations = fuse.Operations(os.getuid(), os.getgid())
- e = operations.inodes.add_entry(fuse.ProjectDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, self.api.users().current().execute()))
+ # check mtime on collection
+ st = os.stat(os.path.join(
+ self.mounttmp,
+ 'FUSE User',
+ 'collection #1 owned by FUSE'))
+ self.assertEqual(st.st_mtime, 1391448174)
- llfuse.init(operations, self.mounttmp, [])
- t = threading.Thread(None, lambda: llfuse.main())
- t.start()
- # wait until the driver is finished initializing
- operations.initlock.wait()
+class FuseHomeTest(MountTestBase):
+ def runTest(self):
+ self.make_mount(fuse.ProjectDirectory,
+ project_object=self.api.users().current().execute())
d1 = os.listdir(self.mounttmp)
- d1.sort()
self.assertIn('Unrestricted public data', d1)
d2 = os.listdir(os.path.join(self.mounttmp, 'Unrestricted public data'))
- d2.sort()
- self.assertEqual(['GNU General Public License, version 3'], d2)
+ public_project = run_test_server.fixture('groups')[
+ 'anonymously_accessible_project']
+ found_in = 0
+ found_not_in = 0
+ for name, item in run_test_server.fixture('collections').iteritems():
+ if 'name' not in item:
+ pass
+ elif item['owner_uuid'] == public_project['uuid']:
+ self.assertIn(item['name'], d2)
+ found_in += 1
+ else:
+ # Artificial assumption here: there is no public
+ # collection fixture with the same name as a
+ # non-public collection.
+ self.assertNotIn(item['name'], d2)
+ found_not_in += 1
+ self.assertNotEqual(0, found_in)
+ self.assertNotEqual(0, found_not_in)
d3 = os.listdir(os.path.join(self.mounttmp, 'Unrestricted public data', 'GNU General Public License, version 3'))
- d3.sort()
self.assertEqual(["GNU_General_Public_License,_version_3.pdf"], d3)