3198: Add enable_write flag to FUSE and --enable-write and --read-only to
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
index d90bd8b398ed014e7becd3df6af5834df2c655b2..f661e41483f5fb3a3bc952c44796c0d8c317b796 100644 (file)
@@ -10,7 +10,7 @@ from apiclient import errors as apiclient_errors
 import errno
 
 from fusefile import StringFile, ObjectFile, FuseArvadosFile
-from fresh import FreshBase, convertTime, use_counter
+from fresh import FreshBase, convertTime, use_counter, check_update
 
 import arvados.collection
 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
@@ -45,9 +45,10 @@ class Directory(FreshBase):
     """
 
     def __init__(self, parent_inode, inodes):
+        """parent_inode is the integer inode number"""
+
         super(Directory, self).__init__()
 
-        """parent_inode is the integer inode number"""
         self.inode = None
         if not isinstance(parent_inode, int):
             raise Exception("parent_inode should be an int")
@@ -78,23 +79,23 @@ class Directory(FreshBase):
                 _logger.warn(e)
 
     @use_counter
+    @check_update
     def __getitem__(self, item):
-        self.checkupdate()
         return self._entries[item]
 
     @use_counter
+    @check_update
     def items(self):
-        self.checkupdate()
         return list(self._entries.items())
 
     @use_counter
+    @check_update
     def __contains__(self, k):
-        self.checkupdate()
         return k in self._entries
 
     @use_counter
+    @check_update
     def __len__(self):
-        self.checkupdate()
         return len(self._entries)
 
     def fresh(self):
@@ -142,8 +143,8 @@ class Directory(FreshBase):
 
         # delete any other directory entries that were not in found in 'items'
         for i in oldentries:
-            _logger.debug("Forgetting about entry '%s' on inode %i", str(i), self.inode)
-            llfuse.invalidate_entry(self.inode, str(i))
+            _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
+            llfuse.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
             self.inodes.del_entry(oldentries[i])
             changed = True
 
@@ -164,7 +165,7 @@ class Directory(FreshBase):
                     self._entries = oldentries
                     return False
             for n in oldentries:
-                llfuse.invalidate_entry(self.inode, str(n))
+                llfuse.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
                 self.inodes.del_entry(oldentries[n])
             llfuse.invalidate_inode(self.inode)
             self.invalidate()
@@ -196,7 +197,22 @@ class Directory(FreshBase):
     def rename(self, name_old, name_new, src):
         raise NotImplementedError()
 
+
 class CollectionDirectoryBase(Directory):
+    """Represent an Arvados Collection as a directory.
+
+    This class is used for Subcollections, and is also the base class for
+    CollectionDirectory, which implements collection loading/saving on
+    Collection records.
+
+    Most operations act only the underlying Arvados `Collection` object.  The
+    `Collection` object signals via a notify callback to
+    `CollectionDirectoryBase.on_event` that an item was added, removed or
+    modified.  FUSE inodes and directory entries are created, deleted or
+    invalidated in response to these events.
+
+    """
+
     def __init__(self, parent_inode, inodes, collection):
         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
         self.collection = collection
@@ -219,14 +235,15 @@ class CollectionDirectoryBase(Directory):
 
     def on_event(self, event, collection, name, item):
         if collection == self.collection:
-            _logger.debug("%s %s %s %s", event, collection, name, item)
+            name = sanitize_filename(name)
+            _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
             with llfuse.lock:
                 if event == arvados.collection.ADD:
                     self.new_entry(name, item, self.mtime())
                 elif event == arvados.collection.DEL:
                     ent = self._entries[name]
                     del self._entries[name]
-                    llfuse.invalidate_entry(self.inode, name)
+                    llfuse.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
                     self.inodes.del_entry(ent)
                 elif event == arvados.collection.MOD:
                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
@@ -243,37 +260,64 @@ class CollectionDirectoryBase(Directory):
     def writable(self):
         return self.collection.writable()
 
+    @use_counter
     def flush(self):
         with llfuse.lock_released:
             self.collection.root_collection().save()
 
+    @use_counter
+    @check_update
     def create(self, name):
         with llfuse.lock_released:
             self.collection.open(name, "w").close()
 
+    @use_counter
+    @check_update
     def mkdir(self, name):
         with llfuse.lock_released:
             self.collection.mkdirs(name)
 
+    @use_counter
+    @check_update
     def unlink(self, name):
         with llfuse.lock_released:
             self.collection.remove(name)
+        self.flush()
 
+    @use_counter
+    @check_update
     def rmdir(self, name):
         with llfuse.lock_released:
             self.collection.remove(name)
+        self.flush()
 
+    @use_counter
+    @check_update
     def rename(self, name_old, name_new, src):
+        if not isinstance(src, CollectionDirectoryBase):
+            raise llfuse.FUSEError(errno.EPERM)
+
+        if name_new in self:
+            ent = src[name_old]
+            tgt = self[name_new]
+            if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
+                pass
+            elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
+                if len(tgt) > 0:
+                    raise llfuse.FUSEError(errno.ENOTEMPTY)
+            elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
+                raise llfuse.FUSEError(errno.ENOTDIR)
+            elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
+                raise llfuse.FUSEError(errno.EISDIR)
+
         with llfuse.lock_released:
-            if not isinstance(src, CollectionDirectoryBase):
-                raise llfuse.FUSEError(errno.EPERM)
             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
-            self.flush()
-            src.flush()
+        self.flush()
+        src.flush()
 
 
 class CollectionDirectory(CollectionDirectoryBase):
-    """Represents the root of a directory tree holding a collection."""
+    """Represents the root of a directory tree representing a collection."""
 
     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
@@ -327,7 +371,8 @@ class CollectionDirectory(CollectionDirectoryBase):
     def uuid(self):
         return self.collection_locator
 
-    def update(self):
+    @use_counter
+    def update(self, to_record_version=None):
         try:
             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
                 return True
@@ -343,8 +388,11 @@ class CollectionDirectory(CollectionDirectoryBase):
                         return
 
                     _logger.debug("Updating %s", self.collection_locator)
-                    if self.collection:
-                        self.collection.update()
+                    if self.collection is not None:
+                        if self.collection.known_past_version(to_record_version):
+                            _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
+                        else:
+                            self.collection.update()
                     else:
                         if uuid_pattern.match(self.collection_locator):
                             coll_reader = arvados.collection.Collection(
@@ -387,8 +435,9 @@ class CollectionDirectory(CollectionDirectoryBase):
                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
         return False
 
+    @use_counter
+    @check_update
     def __getitem__(self, item):
-        self.checkupdate()
         if item == '.arvados#collection':
             if self.collection_record_file is None:
                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
@@ -417,6 +466,13 @@ class CollectionDirectory(CollectionDirectoryBase):
         # footprint directly would be more accurate, but also more complicated.
         return self._manifest_size * 128
 
+    def finalize(self):
+        if self.collection is not None:
+            if self.writable():
+                self.collection.save()
+            self.collection.stop_threads()
+
+
 class MagicDirectory(Directory):
     """A special directory that logically contains the set of all extant keep locators.
 
@@ -506,6 +562,7 @@ class TagsDirectory(RecursiveInvalidateDirectory):
         self._poll = True
         self._poll_time = poll_time
 
+    @use_counter
     def update(self):
         with llfuse.lock_released:
             tags = self.api.links().list(
@@ -533,6 +590,7 @@ class TagDirectory(Directory):
         self._poll = poll
         self._poll_time = poll_time
 
+    @use_counter
     def update(self):
         with llfuse.lock_released:
             taggedcollections = self.api.links().list(
@@ -581,6 +639,7 @@ class ProjectDirectory(Directory):
     def uuid(self):
         return self.project_uuid
 
+    @use_counter
     def update(self):
         if self.project_object_file == None:
             self.project_object_file = ObjectFile(self.inode, self.project_object)
@@ -634,8 +693,9 @@ class ProjectDirectory(Directory):
         finally:
             self._updating_lock.release()
 
+    @use_counter
+    @check_update
     def __getitem__(self, item):
-        self.checkupdate()
         if item == '.arvados#project':
             return self.project_object_file
         else:
@@ -647,6 +707,8 @@ class ProjectDirectory(Directory):
         else:
             return super(ProjectDirectory, self).__contains__(k)
 
+    @use_counter
+    @check_update
     def writable(self):
         with llfuse.lock_released:
             if not self._current_user:
@@ -656,6 +718,8 @@ class ProjectDirectory(Directory):
     def persisted(self):
         return True
 
+    @use_counter
+    @check_update
     def mkdir(self, name):
         try:
             with llfuse.lock_released:
@@ -667,6 +731,8 @@ class ProjectDirectory(Directory):
             _logger.error(error)
             raise llfuse.FUSEError(errno.EEXIST)
 
+    @use_counter
+    @check_update
     def rmdir(self, name):
         if name not in self:
             raise llfuse.FUSEError(errno.ENOENT)
@@ -678,12 +744,32 @@ class ProjectDirectory(Directory):
             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
         self.invalidate()
 
+    @use_counter
+    @check_update
     def rename(self, name_old, name_new, src):
-        with llfuse.lock_released:
-            if not isinstance(src, ProjectDirectory):
-                raise llfuse.FUSEError(errno.EPERM)
+        if not isinstance(src, ProjectDirectory):
+            raise llfuse.FUSEError(errno.EPERM)
 
-        raise NotImplementedError()
+        ent = src[name_old]
+
+        if not isinstance(ent, CollectionDirectory):
+            raise llfuse.FUSEError(errno.EPERM)
+
+        if name_new in self:
+            # POSIX semantics for replacing one directory with another is
+            # tricky (the target directory must be empty, the operation must be
+            # atomic which isn't possible with the Arvados API as of this
+            # writing) so don't support that.
+            raise llfuse.FUSEError(errno.EPERM)
+
+        self.api.collections().update(uuid=ent.uuid(),
+                                      body={"owner_uuid": self.uuid(),
+                                            "name": name_new}).execute(num_retries=self.num_retries)
+
+        # Acually move the entry from source directory to this directory.
+        del src._entries[name_old]
+        self._entries[name_new] = ent
+        llfuse.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
 
 
 class SharedDirectory(Directory):
@@ -698,6 +784,7 @@ class SharedDirectory(Directory):
         self._poll = True
         self._poll_time = poll_time
 
+    @use_counter
     def update(self):
         with llfuse.lock_released:
             all_projects = arvados.util.list_all(