self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item))
self._entries[name].populate(mtime)
else:
- self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
+ self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
item.fuse_entry = self._entries[name]
def on_event(self, event, collection, name, item):
class FuseArvadosFile(File):
"""Wraps a ArvadosFile."""
- __slots__ = ('arvfile',)
+ __slots__ = ('arvfile', '_enable_write')
- def __init__(self, parent_inode, arvfile, _mtime):
+ def __init__(self, parent_inode, arvfile, _mtime, enable_write):
super(FuseArvadosFile, self).__init__(parent_inode, _mtime)
self.arvfile = arvfile
+ self._enable_write = enable_write
def size(self):
with llfuse.lock_released:
return False
def writable(self):
- return self.arvfile.writable()
+ return self._enable_write and self.arvfile.writable()
def flush(self):
with llfuse.lock_released:
llfuse.close()
def make_mount(self, root_class, **root_kwargs):
+ enable_write = True
+ if 'enable_write' in root_kwargs:
+ enable_write = root_kwargs.pop('enable_write')
self.operations = fuse.Operations(
os.getuid(), os.getgid(),
api_client=self.api,
- enable_write=True)
+ enable_write=enable_write)
self.operations.inodes.add_entry(root_class(
- llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, True, **root_kwargs))
+ llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, enable_write, **root_kwargs))
llfuse.init(self.operations, self.mounttmp, [])
self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
self.llfuse_thread.daemon = True
@staticmethod
def _test_collection_custom_storage_classes(self, coll):
self.assertEqual(storage_classes_desired(coll), ['foo'])
+
+def _readonlyCollectionTestHelper(mounttmp):
+ f = open(os.path.join(mounttmp, 'thing1.txt'), 'rt')
+ # Testing that close() doesn't raise an error.
+ f.close()
+
+class ReadonlyCollectionTest(MountTestBase):
+ def setUp(self):
+ super(ReadonlyCollectionTest, self).setUp()
+ cw = arvados.collection.Collection()
+ with cw.open('thing1.txt', 'wt') as f:
+ f.write("data 1")
+ cw.save_new(owner_uuid=run_test_server.fixture("groups")["aproject"]["uuid"])
+ self.testcollection = cw.api_response()
+
+ def runTest(self):
+ settings = arvados.config.settings().copy()
+ settings["ARVADOS_API_TOKEN"] = run_test_server.fixture("api_client_authorizations")["project_viewer"]["api_token"]
+ self.api = arvados.safeapi.ThreadSafeApiCache(settings)
+ self.make_mount(fuse.CollectionDirectory, collection_record=self.testcollection, enable_write=False)
+
+ self.pool.apply(_readonlyCollectionTestHelper, (self.mounttmp,))