3198: Renaming collections in projects works. Improved conformance to POSIX semantic...
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
index 898af31d263904d73dcd3583ac0c9f83cf54eb4b..9b93187e4f0f0ea3c35ec257b52df714fe23adfc 100644 (file)
@@ -5,6 +5,9 @@ import llfuse
 import arvados
 import apiclient
 import functools
+import threading
+from apiclient import errors as apiclient_errors
+import errno
 
 from fusefile import StringFile, ObjectFile, FuseArvadosFile
 from fresh import FreshBase, convertTime, use_counter
@@ -72,7 +75,7 @@ class Directory(FreshBase):
             try:
                 self.update()
             except apiclient.errors.HttpError as e:
-                _logger.debug(e)
+                _logger.warn(e)
 
     @use_counter
     def __getitem__(self, item):
@@ -89,6 +92,11 @@ class Directory(FreshBase):
         self.checkupdate()
         return k in self._entries
 
+    @use_counter
+    def __len__(self):
+        self.checkupdate()
+        return len(self._entries)
+
     def fresh(self):
         self.inodes.touch(self)
         super(Directory, self).fresh()
@@ -125,6 +133,7 @@ class Directory(FreshBase):
                     self._entries[name] = oldentries[name]
                     del oldentries[name]
                 else:
+                    _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
                     # create new directory entry
                     ent = new_entry(i)
                     if ent is not None:
@@ -133,6 +142,7 @@ class Directory(FreshBase):
 
         # delete any other directory entries that were not in found in 'items'
         for i in oldentries:
+            _logger.debug("Forgetting about entry '%s' on inode %i", str(i), self.inode)
             llfuse.invalidate_entry(self.inode, str(i))
             self.inodes.del_entry(oldentries[i])
             changed = True
@@ -171,6 +181,21 @@ class Directory(FreshBase):
     def flush(self):
         pass
 
+    def create(self, name):
+        raise NotImplementedError()
+
+    def mkdir(self, name):
+        raise NotImplementedError()
+
+    def unlink(self, name):
+        raise NotImplementedError()
+
+    def rmdir(self, name):
+        raise NotImplementedError()
+
+    def rename(self, name_old, name_new, src):
+        raise NotImplementedError()
+
 class CollectionDirectoryBase(Directory):
     def __init__(self, parent_inode, inodes, collection):
         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
@@ -178,15 +203,23 @@ class CollectionDirectoryBase(Directory):
 
     def new_entry(self, name, item, mtime):
         name = sanitize_filename(name)
-        if isinstance(item, arvados.collection.RichCollectionBase):
+        if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
+            if item.fuse_entry.dead is not True:
+                raise Exception("Can only reparent dead inode entry")
+            if item.fuse_entry.inode is None:
+                raise Exception("Reparented entry must still have valid inode")
+            item.fuse_entry.dead = False
+            self._entries[name] = item.fuse_entry
+        elif isinstance(item, arvados.collection.RichCollectionBase):
             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
             self._entries[name].populate(mtime)
         else:
             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
+        item.fuse_entry = self._entries[name]
 
     def on_event(self, event, collection, name, item):
-        _logger.warn("Got event! %s %s %s %s", event, collection, name, item)
         if collection == self.collection:
+            _logger.debug("%s %s %s %s", event, collection, name, item)
             with llfuse.lock:
                 if event == arvados.collection.ADD:
                     self.new_entry(name, item, self.mtime())
@@ -196,9 +229,10 @@ class CollectionDirectoryBase(Directory):
                     llfuse.invalidate_entry(self.inode, name)
                     self.inodes.del_entry(ent)
                 elif event == arvados.collection.MOD:
-                    ent = self._entries[name]
-                    llfuse.invalidate_inode(ent.inode)
-        _logger.warn("Finished handling event")
+                    if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
+                        llfuse.invalidate_inode(item.fuse_entry.inode)
+                    elif name in self._entries:
+                        llfuse.invalidate_inode(self._entries[name].inode)
 
     def populate(self, mtime):
         self._mtime = mtime
@@ -210,7 +244,47 @@ class CollectionDirectoryBase(Directory):
         return self.collection.writable()
 
     def flush(self):
-        self.collection.root_collection().save()
+        with llfuse.lock_released:
+            self.collection.root_collection().save()
+
+    def create(self, name):
+        with llfuse.lock_released:
+            self.collection.open(name, "w").close()
+
+    def mkdir(self, name):
+        with llfuse.lock_released:
+            self.collection.mkdirs(name)
+
+    def unlink(self, name):
+        with llfuse.lock_released:
+            self.collection.remove(name)
+
+    def rmdir(self, name):
+        with llfuse.lock_released:
+            self.collection.remove(name)
+
+    def rename(self, name_old, name_new, src):
+        if not isinstance(src, CollectionDirectoryBase):
+            raise llfuse.FUSEError(errno.EPERM)
+
+        if name_new in self:
+            ent = src[name_old]
+            tgt = self[name_new]
+            if isinstance(FuseArvadosFile, ent) and isinstance(FuseArvadosFile, tgt):
+                pass
+            elif isinstance(CollectionDirectoryBase, ent) and isinstance(CollectionDirectoryBase, tgt):
+                if len(tgt) > 0:
+                    raise llfuse.FUSEError(errno.ENOTEMPTY)
+            elif isinstance(CollectionDirectoryBase, ent) and isinstance(FuseArvadosFile, tgt):
+                raise llfuse.FUSEError(errno.ENOTDIR)
+            elif isinstance(FuseArvadosFile, ent) and isinstance(CollectionDirectoryBase, tgt):
+                raise llfuse.FUSEError(errno.EISDIR)
+
+        with llfuse.lock_released:
+            self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
+        self.flush()
+        src.flush()
+
 
 class CollectionDirectory(CollectionDirectoryBase):
     """Represents the root of a directory tree holding a collection."""
@@ -230,6 +304,7 @@ class CollectionDirectory(CollectionDirectoryBase):
         self._manifest_size = 0
         if self.collection_locator:
             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
+        self._updating_lock = threading.Lock()
 
     def same(self, i):
         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
@@ -275,38 +350,45 @@ class CollectionDirectory(CollectionDirectoryBase):
                 self.fresh()
                 return True
 
-            with llfuse.lock_released:
-                _logger.debug("Updating %s", self.collection_locator)
-                if self.collection:
-                    self.collection.update()
-                else:
-                    if uuid_pattern.match(self.collection_locator):
-                        coll_reader = arvados.collection.Collection(
-                            self.collection_locator, self.api, self.api.keep,
-                            num_retries=self.num_retries)
+            try:
+                with llfuse.lock_released:
+                    self._updating_lock.acquire()
+                    if not self.stale():
+                        return
+
+                    _logger.debug("Updating %s", self.collection_locator)
+                    if self.collection:
+                        self.collection.update()
                     else:
-                        coll_reader = arvados.collection.CollectionReader(
-                            self.collection_locator, self.api, self.api.keep,
-                            num_retries=self.num_retries)
-                    new_collection_record = coll_reader.api_response() or {}
-                    # If the Collection only exists in Keep, there will be no API
-                    # response.  Fill in the fields we need.
-                    if 'uuid' not in new_collection_record:
-                        new_collection_record['uuid'] = self.collection_locator
-                    if "portable_data_hash" not in new_collection_record:
-                        new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
-                    if 'manifest_text' not in new_collection_record:
-                        new_collection_record['manifest_text'] = coll_reader.manifest_text()
-
-                    if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
-                        self.new_collection(new_collection_record, coll_reader)
-
-                    self._manifest_size = len(coll_reader.manifest_text())
-                    _logger.debug("%s manifest_size %i", self, self._manifest_size)
-            # end with llfuse.lock_released, re-acquire lock
+                        if uuid_pattern.match(self.collection_locator):
+                            coll_reader = arvados.collection.Collection(
+                                self.collection_locator, self.api, self.api.keep,
+                                num_retries=self.num_retries)
+                        else:
+                            coll_reader = arvados.collection.CollectionReader(
+                                self.collection_locator, self.api, self.api.keep,
+                                num_retries=self.num_retries)
+                        new_collection_record = coll_reader.api_response() or {}
+                        # If the Collection only exists in Keep, there will be no API
+                        # response.  Fill in the fields we need.
+                        if 'uuid' not in new_collection_record:
+                            new_collection_record['uuid'] = self.collection_locator
+                        if "portable_data_hash" not in new_collection_record:
+                            new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
+                        if 'manifest_text' not in new_collection_record:
+                            new_collection_record['manifest_text'] = coll_reader.manifest_text()
+
+                        if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
+                            self.new_collection(new_collection_record, coll_reader)
+
+                        self._manifest_size = len(coll_reader.manifest_text())
+                        _logger.debug("%s manifest_size %i", self, self._manifest_size)
+                # end with llfuse.lock_released, re-acquire lock
 
-            self.fresh()
-            return True
+                self.fresh()
+                return True
+            finally:
+                self._updating_lock.release()
         except arvados.errors.NotFoundError:
             _logger.exception("arv-mount %s: error", self.collection_locator)
         except arvados.errors.ArgumentError as detail:
@@ -344,6 +426,9 @@ class CollectionDirectory(CollectionDirectoryBase):
         return (self.collection_locator is not None)
 
     def objsize(self):
+        # This is an empirically-derived heuristic to estimate the memory used
+        # to store this collection's metadata.  Calculating the memory
+        # footprint directly would be more accurate, but also more complicated.
         return self._manifest_size * 128
 
 class MagicDirectory(Directory):
@@ -486,9 +571,11 @@ class ProjectDirectory(Directory):
         self.num_retries = num_retries
         self.project_object = project_object
         self.project_object_file = None
-        self.uuid = project_object['uuid']
+        self.project_uuid = project_object['uuid']
         self._poll = poll
         self._poll_time = poll_time
+        self._updating_lock = threading.Lock()
+        self._current_user = None
 
     def createDirectory(self, i):
         if collection_uuid_pattern.match(i['uuid']):
@@ -506,7 +593,7 @@ class ProjectDirectory(Directory):
             return None
 
     def uuid(self):
-        return self.uuid
+        return self.project_uuid
 
     def update(self):
         if self.project_object_file == None:
@@ -530,31 +617,36 @@ class ProjectDirectory(Directory):
                 return None
 
         def samefn(a, i):
-            if isinstance(a, CollectionDirectory):
-                return a.collection_locator == i['uuid']
-            elif isinstance(a, ProjectDirectory):
-                return a.uuid == i['uuid']
+            if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
+                return a.uuid() == i['uuid']
             elif isinstance(a, ObjectFile):
-                return a.uuid == i['uuid'] and not a.stale()
+                return a.uuid() == i['uuid'] and not a.stale()
             return False
 
-        with llfuse.lock_released:
-            if group_uuid_pattern.match(self.uuid):
-                self.project_object = self.api.groups().get(
-                    uuid=self.uuid).execute(num_retries=self.num_retries)
-            elif user_uuid_pattern.match(self.uuid):
-                self.project_object = self.api.users().get(
-                    uuid=self.uuid).execute(num_retries=self.num_retries)
+        try:
+            with llfuse.lock_released:
+                self._updating_lock.acquire()
+                if not self.stale():
+                    return
 
-            contents = arvados.util.list_all(self.api.groups().contents,
-                                             self.num_retries, uuid=self.uuid)
+                if group_uuid_pattern.match(self.project_uuid):
+                    self.project_object = self.api.groups().get(
+                        uuid=self.project_uuid).execute(num_retries=self.num_retries)
+                elif user_uuid_pattern.match(self.project_uuid):
+                    self.project_object = self.api.users().get(
+                        uuid=self.project_uuid).execute(num_retries=self.num_retries)
 
-        # end with llfuse.lock_released, re-acquire lock
+                contents = arvados.util.list_all(self.api.groups().contents,
+                                                 self.num_retries, uuid=self.project_uuid)
+
+            # end with llfuse.lock_released, re-acquire lock
 
-        self.merge(contents,
-                   namefn,
-                   samefn,
-                   self.createDirectory)
+            self.merge(contents,
+                       namefn,
+                       samefn,
+                       self.createDirectory)
+        finally:
+            self._updating_lock.release()
 
     def __getitem__(self, item):
         self.checkupdate()
@@ -569,9 +661,61 @@ class ProjectDirectory(Directory):
         else:
             return super(ProjectDirectory, self).__contains__(k)
 
+    def writable(self):
+        with llfuse.lock_released:
+            if not self._current_user:
+                self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
+            return self._current_user["uuid"] in self.project_object["writable_by"]
+
     def persisted(self):
-        return False
+        return True
+
+    def mkdir(self, name):
+        try:
+            with llfuse.lock_released:
+                self.api.collections().create(body={"owner_uuid": self.project_uuid,
+                                                    "name": name,
+                                                    "manifest_text": ""}).execute(num_retries=self.num_retries)
+            self.invalidate()
+        except apiclient_errors.Error as error:
+            _logger.error(error)
+            raise llfuse.FUSEError(errno.EEXIST)
+
+    def rmdir(self, name):
+        if name not in self:
+            raise llfuse.FUSEError(errno.ENOENT)
+        if not isinstance(self[name], CollectionDirectory):
+            raise llfuse.FUSEError(errno.EPERM)
+        if len(self[name]) > 0:
+            raise llfuse.FUSEError(errno.ENOTEMPTY)
+        with llfuse.lock_released:
+            self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
+        self.invalidate()
 
+    def rename(self, name_old, name_new, src):
+        if not isinstance(src, ProjectDirectory):
+            raise llfuse.FUSEError(errno.EPERM)
+
+        ent = src[name_old]
+
+        if not isinstance(ent, CollectionDirectory):
+            raise llfuse.FUSEError(errno.EPERM)
+
+        if name_new in self:
+            # POSIX semantics for replacing one directory with another is
+            # tricky (the target directory must be empty, the operation must be
+            # atomic which isn't possible with the Arvados API as of this
+            # writing) so don't support that.
+            raise llfuse.FUSEError(errno.EPERM)
+
+        self.api.collections().update(uuid=ent.uuid(),
+                                      body={"owner_uuid": self.uuid(),
+                                            "name": name_new}).execute(num_retries=self.num_retries)
+
+        # Acually move the entry from source directory to this directory.
+        del src._entries[name_old]
+        self._entries[name_new] = ent
+        llfuse.invalidate_entry(src.inode, name_old)
 
 class SharedDirectory(Directory):
     """A special directory that represents users or groups who have shared projects with me."""
@@ -620,11 +764,14 @@ class SharedDirectory(Directory):
             for r in root_owners:
                 if r in objects:
                     obr = objects[r]
-                    if "name" in obr:
+                    if obr.get("name"):
                         contents[obr["name"]] = obr
-                    if "first_name" in obr:
+                    #elif obr.get("username"):
+                    #    contents[obr["username"]] = obr
+                    elif "first_name" in obr:
                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
 
+
             for r in roots:
                 if r['owner_uuid'] not in objects:
                     contents[r['name']] = r
@@ -634,7 +781,7 @@ class SharedDirectory(Directory):
         try:
             self.merge(contents.items(),
                        lambda i: i[0],
-                       lambda a, i: a.uuid == i[1]['uuid'],
+                       lambda a, i: a.uuid() == i[1]['uuid'],
                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
         except Exception:
             _logger.exception()