Merge branch 'master' of git.curoverse.com:arvados into 11876-r-sdk
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
index 21961a56f41d8776133173565edfad030c46444c..becd66975f676a76b09eb8da6761582fd2e94b5e 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 import logging
 import re
 import time
@@ -8,8 +12,9 @@ import functools
 import threading
 from apiclient import errors as apiclient_errors
 import errno
+import time
 
-from fusefile import StringFile, ObjectFile, FuseArvadosFile
+from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
 from fresh import FreshBase, convertTime, use_counter, check_update
 
 import arvados.collection
@@ -145,34 +150,53 @@ class Directory(FreshBase):
         # delete any other directory entries that were not in found in 'items'
         for i in oldentries:
             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
-            self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
+            self.inodes.invalidate_entry(self, i)
             self.inodes.del_entry(oldentries[i])
             changed = True
 
         if changed:
-            self.inodes.invalidate_inode(self.inode)
+            self.inodes.invalidate_inode(self)
             self._mtime = time.time()
 
         self.fresh()
 
-    def clear(self, force=False):
-        """Delete all entries"""
+    def in_use(self):
+        if super(Directory, self).in_use():
+            return True
+        for v in self._entries.itervalues():
+            if v.in_use():
+                return True
+        return False
 
-        if not self.in_use() or force:
-            oldentries = self._entries
-            self._entries = {}
-            for n in oldentries:
-                if not oldentries[n].clear(force):
-                    self._entries = oldentries
-                    return False
-            for n in oldentries:
-                self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
-                self.inodes.del_entry(oldentries[n])
-            self.inodes.invalidate_inode(self.inode)
-            self.invalidate()
+    def has_ref(self, only_children):
+        if super(Directory, self).has_ref(only_children):
             return True
-        else:
-            return False
+        for v in self._entries.itervalues():
+            if v.has_ref(False):
+                return True
+        return False
+
+    def clear(self):
+        """Delete all entries"""
+        oldentries = self._entries
+        self._entries = {}
+        for n in oldentries:
+            oldentries[n].clear()
+            self.inodes.del_entry(oldentries[n])
+        self.invalidate()
+
+    def kernel_invalidate(self):
+        # Invalidating the dentry on the parent implies invalidating all paths
+        # below it as well.
+        parent = self.inodes[self.parent_inode]
+
+        # Find self on the parent in order to invalidate this path.
+        # Calling the public items() method might trigger a refresh,
+        # which we definitely don't want, so read the internal dict directly.
+        for k,v in parent._entries.items():
+            if v is self:
+                self.inodes.invalidate_entry(parent, k)
+                break
 
     def mtime(self):
         return self._mtime
@@ -183,6 +207,9 @@ class Directory(FreshBase):
     def flush(self):
         pass
 
+    def want_event_subscribe(self):
+        raise NotImplementedError()
+
     def create(self, name):
         raise NotImplementedError()
 
@@ -244,13 +271,13 @@ class CollectionDirectoryBase(Directory):
                 elif event == arvados.collection.DEL:
                     ent = self._entries[name]
                     del self._entries[name]
-                    self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
+                    self.inodes.invalidate_entry(self, name)
                     self.inodes.del_entry(ent)
                 elif event == arvados.collection.MOD:
                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
-                        self.inodes.invalidate_inode(item.fuse_entry.inode)
+                        self.inodes.invalidate_inode(item.fuse_entry)
                     elif name in self._entries:
-                        self.inodes.invalidate_inode(self._entries[name].inode)
+                        self.inodes.invalidate_inode(self._entries[name])
 
     def populate(self, mtime):
         self._mtime = mtime
@@ -316,6 +343,10 @@ class CollectionDirectoryBase(Directory):
         self.flush()
         src.flush()
 
+    def clear(self):
+        super(CollectionDirectoryBase, self).clear()
+        self.collection = None
+
 
 class CollectionDirectory(CollectionDirectoryBase):
     """Represents the root of a directory tree representing a collection."""
@@ -326,6 +357,13 @@ class CollectionDirectory(CollectionDirectoryBase):
         self.num_retries = num_retries
         self.collection_record_file = None
         self.collection_record = None
+        self._poll = True
+        try:
+            self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2)/2)
+        except:
+            _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
+            self._poll_time = 60*60
+
         if isinstance(collection_record, dict):
             self.collection_locator = collection_record['uuid']
             self._mtime = convertTime(collection_record.get('modified_at'))
@@ -343,6 +381,9 @@ class CollectionDirectory(CollectionDirectoryBase):
     def writable(self):
         return self.collection.writable() if self.collection is not None else self._writable
 
+    def want_event_subscribe(self):
+        return (uuid_pattern.match(self.collection_locator) is not None)
+
     # Used by arv-web.py to switch the contents of the CollectionDirectory
     def change_collection(self, new_locator):
         """Switch the contents of the CollectionDirectory.
@@ -356,7 +397,7 @@ class CollectionDirectory(CollectionDirectoryBase):
 
     def new_collection(self, new_collection_record, coll_reader):
         if self.inode:
-            self.clear(force=True)
+            self.clear()
 
         self.collection_record = new_collection_record
 
@@ -388,7 +429,7 @@ class CollectionDirectory(CollectionDirectoryBase):
                     if not self.stale():
                         return
 
-                    _logger.debug("Updating %s", to_record_version)
+                    _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
                     if self.collection is not None:
                         if self.collection.known_past_version(to_record_version):
                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
@@ -434,6 +475,7 @@ class CollectionDirectory(CollectionDirectoryBase):
             _logger.exception("arv-mount %s: error", self.collection_locator)
             if self.collection_record is not None and "manifest_text" in self.collection_record:
                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
+        self.invalidate()
         return False
 
     @use_counter
@@ -473,6 +515,12 @@ class CollectionDirectory(CollectionDirectoryBase):
                 self.collection.save()
             self.collection.stop_threads()
 
+    def clear(self):
+        if self.collection is not None:
+            self.collection.stop_threads()
+        super(CollectionDirectory, self).clear()
+        self._manifest_size = 0
+
 
 class TmpCollectionDirectory(CollectionDirectoryBase):
     """A directory backed by an Arvados collection that never gets saved.
@@ -483,43 +531,37 @@ class TmpCollectionDirectory(CollectionDirectoryBase):
     job output.
     """
 
+    class UnsaveableCollection(arvados.collection.Collection):
+        def save(self):
+            pass
+        def save_new(self):
+            pass
+
     def __init__(self, parent_inode, inodes, api_client, num_retries):
-        collection = arvados.collection.Collection(
+        collection = self.UnsaveableCollection(
             api_client=api_client,
-            keep_client=api_client.keep)
-        collection.save = self._commit_collection
-        collection.save_new = self._commit_collection
+            keep_client=api_client.keep,
+            num_retries=num_retries)
         super(TmpCollectionDirectory, self).__init__(
             parent_inode, inodes, collection)
         self.collection_record_file = None
-        self._subscribed = False
-        self._update_collection_record()
-
-    def update(self, *args, **kwargs):
-        if not self._subscribed:
-            with llfuse.lock_released:
-                self.populate(self.mtime())
-            self._subscribed = True
+        self.populate(self.mtime())
 
-    @use_counter
-    def _commit_collection(self):
-        """Commit the data blocks, but don't save the collection to API.
+    def on_event(self, *args, **kwargs):
+        super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
+        if self.collection_record_file:
+            with llfuse.lock:
+                self.collection_record_file.invalidate()
+            self.inodes.invalidate_inode(self.collection_record_file)
+            _logger.debug("%s invalidated collection record", self)
 
-        Update the content of the special .arvados#collection file, if
-        it has been instantiated.
-        """
-        self.collection.flush()
-        self._update_collection_record()
-        if self.collection_record_file is not None:
-            self.collection_record_file.update(self.collection_record)
-            self.inodes.invalidate_inode(self.collection_record_file.inode)
-
-    def _update_collection_record(self):
-        self.collection_record = {
-            "uuid": None,
-            "manifest_text": self.collection.manifest_text(),
-            "portable_data_hash": self.collection.portable_data_hash(),
-        }
+    def collection_record(self):
+        with llfuse.lock_released:
+            return {
+                "uuid": None,
+                "manifest_text": self.collection.manifest_text(),
+                "portable_data_hash": self.collection.portable_data_hash(),
+            }
 
     def __contains__(self, k):
         return (k == '.arvados#collection' or
@@ -529,18 +571,29 @@ class TmpCollectionDirectory(CollectionDirectoryBase):
     def __getitem__(self, item):
         if item == '.arvados#collection':
             if self.collection_record_file is None:
-                self.collection_record_file = ObjectFile(
+                self.collection_record_file = FuncToJSONFile(
                     self.inode, self.collection_record)
                 self.inodes.add_entry(self.collection_record_file)
             return self.collection_record_file
         return super(TmpCollectionDirectory, self).__getitem__(item)
 
+    def persisted(self):
+        return False
+
     def writable(self):
         return True
 
+    def want_event_subscribe(self):
+        return False
+
     def finalize(self):
         self.collection.stop_threads()
 
+    def invalidate(self):
+        if self.collection_record_file:
+            self.collection_record_file.invalidate()
+        super(TmpCollectionDirectory, self).invalidate()
+
 
 class MagicDirectory(Directory):
     """A special directory that logically contains the set of all extant keep locators.
@@ -556,12 +609,13 @@ class MagicDirectory(Directory):
     README_TEXT = """
 This directory provides access to Arvados collections as subdirectories listed
 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
-the form '1234567890abcdefghijklmnopqrstuv+123').
+the form '1234567890abcdef0123456789abcdef+123').
 
 Note that this directory will appear empty until you attempt to access a
 specific collection subdirectory (such as trying to 'cd' into it), at which
 point the collection will actually be looked up on the server and the directory
 will appear if it exists.
+
 """.lstrip()
 
     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
@@ -590,6 +644,7 @@ will appear if it exists.
             return False
 
         try:
+            e = None
             e = self.inodes.add_entry(CollectionDirectory(
                     self.inode, self.inodes, self.api, self.num_retries, k))
 
@@ -600,11 +655,13 @@ will appear if it exists.
                     self.inodes.del_entry(e)
                 return True
             else:
+                self.inodes.invalidate_entry(self, k)
                 self.inodes.del_entry(e)
                 return False
-        except Exception as e:
-            _logger.debug('arv-mount exception keep %s', e)
-            self.inodes.del_entry(e)
+        except Exception as ex:
+            _logger.exception("arv-mount lookup '%s':", k)
+            if e is not None:
+                self.inodes.del_entry(e)
             return False
 
     def __getitem__(self, item):
@@ -613,21 +670,14 @@ will appear if it exists.
         else:
             raise KeyError("No collection with id " + item)
 
-    def clear(self, force=False):
+    def clear(self):
         pass
 
+    def want_event_subscribe(self):
+        return not self.pdh_only
 
-class RecursiveInvalidateDirectory(Directory):
-    def invalidate(self):
-        try:
-            super(RecursiveInvalidateDirectory, self).invalidate()
-            for a in self._entries:
-                self._entries[a].invalidate()
-        except Exception:
-            _logger.exception()
 
-
-class TagsDirectory(RecursiveInvalidateDirectory):
+class TagsDirectory(Directory):
     """A special directory that contains as subdirectories all tags visible to the user."""
 
     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
@@ -636,20 +686,50 @@ class TagsDirectory(RecursiveInvalidateDirectory):
         self.num_retries = num_retries
         self._poll = True
         self._poll_time = poll_time
+        self._extra = set()
+
+    def want_event_subscribe(self):
+        return True
 
     @use_counter
     def update(self):
         with llfuse.lock_released:
             tags = self.api.links().list(
-                filters=[['link_class', '=', 'tag']],
-                select=['name'], distinct=True
+                filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
+                select=['name'], distinct=True, limit=1000
                 ).execute(num_retries=self.num_retries)
         if "items" in tags:
-            self.merge(tags['items'],
+            self.merge(tags['items']+[{"name": n} for n in self._extra],
                        lambda i: i['name'],
                        lambda a, i: a.tag == i['name'],
                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
 
+    @use_counter
+    @check_update
+    def __getitem__(self, item):
+        if super(TagsDirectory, self).__contains__(item):
+            return super(TagsDirectory, self).__getitem__(item)
+        with llfuse.lock_released:
+            tags = self.api.links().list(
+                filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
+            ).execute(num_retries=self.num_retries)
+        if tags["items"]:
+            self._extra.add(item)
+            self.update()
+        return super(TagsDirectory, self).__getitem__(item)
+
+    @use_counter
+    @check_update
+    def __contains__(self, k):
+        if super(TagsDirectory, self).__contains__(k):
+            return True
+        try:
+            self[k]
+            return True
+        except KeyError:
+            pass
+        return False
+
 
 class TagDirectory(Directory):
     """A special directory that contains as subdirectories all collections visible
@@ -665,6 +745,9 @@ class TagDirectory(Directory):
         self._poll = poll
         self._poll_time = poll_time
 
+    def want_event_subscribe(self):
+        return True
+
     @use_counter
     def update(self):
         with llfuse.lock_released:
@@ -695,6 +778,10 @@ class ProjectDirectory(Directory):
         self._poll_time = poll_time
         self._updating_lock = threading.Lock()
         self._current_user = None
+        self._full_listing = False
+
+    def want_event_subscribe(self):
+        return True
 
     def createDirectory(self, i):
         if collection_uuid_pattern.match(i['uuid']):
@@ -714,27 +801,35 @@ class ProjectDirectory(Directory):
     def uuid(self):
         return self.project_uuid
 
+    def items(self):
+        self._full_listing = True
+        return super(ProjectDirectory, self).items()
+
+    def namefn(self, i):
+        if 'name' in i:
+            if i['name'] is None or len(i['name']) == 0:
+                return None
+            elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
+                # collection or subproject
+                return i['name']
+            elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
+                # name link
+                return i['name']
+            elif 'kind' in i and i['kind'].startswith('arvados#'):
+                # something else
+                return "{}.{}".format(i['name'], i['kind'][8:])
+        else:
+            return None
+
+
     @use_counter
     def update(self):
         if self.project_object_file == None:
             self.project_object_file = ObjectFile(self.inode, self.project_object)
             self.inodes.add_entry(self.project_object_file)
 
-        def namefn(i):
-            if 'name' in i:
-                if i['name'] is None or len(i['name']) == 0:
-                    return None
-                elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
-                    # collection or subproject
-                    return i['name']
-                elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
-                    # name link
-                    return i['name']
-                elif 'kind' in i and i['kind'].startswith('arvados#'):
-                    # something else
-                    return "{}.{}".format(i['name'], i['kind'][8:])
-            else:
-                return None
+        if not self._full_listing:
+            return
 
         def samefn(a, i):
             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
@@ -756,31 +851,62 @@ class ProjectDirectory(Directory):
                     self.project_object = self.api.users().get(
                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
 
-                contents = arvados.util.list_all(self.api.groups().contents,
-                                                 self.num_retries, uuid=self.project_uuid)
+                contents = arvados.util.list_all(self.api.groups().list,
+                                                 self.num_retries,
+                                                 filters=[["owner_uuid", "=", self.project_uuid],
+                                                          ["group_class", "=", "project"]])
+                contents.extend(arvados.util.list_all(self.api.collections().list,
+                                                      self.num_retries,
+                                                      filters=[["owner_uuid", "=", self.project_uuid]]))
 
             # end with llfuse.lock_released, re-acquire lock
 
             self.merge(contents,
-                       namefn,
+                       self.namefn,
                        samefn,
                        self.createDirectory)
         finally:
             self._updating_lock.release()
 
+    def _add_entry(self, i, name):
+        ent = self.createDirectory(i)
+        self._entries[name] = self.inodes.add_entry(ent)
+        return self._entries[name]
+
     @use_counter
     @check_update
-    def __getitem__(self, item):
-        if item == '.arvados#project':
+    def __getitem__(self, k):
+        if k == '.arvados#project':
             return self.project_object_file
-        else:
-            return super(ProjectDirectory, self).__getitem__(item)
+        elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
+            return super(ProjectDirectory, self).__getitem__(k)
+        with llfuse.lock_released:
+            contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
+                                                       ["group_class", "=", "project"],
+                                                       ["name", "=", k]],
+                                              limit=1).execute(num_retries=self.num_retries)["items"]
+            if not contents:
+                contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
+                                                                ["name", "=", k]],
+                                                       limit=1).execute(num_retries=self.num_retries)["items"]
+        if contents:
+            name = sanitize_filename(self.namefn(contents[0]))
+            if name != k:
+                raise KeyError(k)
+            return self._add_entry(contents[0], name)
+
+        # Didn't find item
+        raise KeyError(k)
 
     def __contains__(self, k):
         if k == '.arvados#project':
             return True
-        else:
-            return super(ProjectDirectory, self).__contains__(k)
+        try:
+            self[k]
+            return True
+        except KeyError:
+            pass
+        return False
 
     @use_counter
     @check_update
@@ -844,7 +970,51 @@ class ProjectDirectory(Directory):
         # Acually move the entry from source directory to this directory.
         del src._entries[name_old]
         self._entries[name_new] = ent
-        self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
+        self.inodes.invalidate_entry(src, name_old)
+
+    @use_counter
+    def child_event(self, ev):
+        properties = ev.get("properties") or {}
+        old_attrs = properties.get("old_attributes") or {}
+        new_attrs = properties.get("new_attributes") or {}
+        old_attrs["uuid"] = ev["object_uuid"]
+        new_attrs["uuid"] = ev["object_uuid"]
+        old_name = sanitize_filename(self.namefn(old_attrs))
+        new_name = sanitize_filename(self.namefn(new_attrs))
+
+        # create events will have a new name, but not an old name
+        # delete events will have an old name, but not a new name
+        # update events will have an old and new name, and they may be same or different
+        # if they are the same, an unrelated field changed and there is nothing to do.
+
+        if old_attrs.get("owner_uuid") != self.project_uuid:
+            # Was moved from somewhere else, so don't try to remove entry.
+            old_name = None
+        if ev.get("object_owner_uuid") != self.project_uuid:
+            # Was moved to somewhere else, so don't try to add entry
+            new_name = None
+
+        if old_attrs.get("is_trashed"):
+            # Was previously deleted
+            old_name = None
+        if new_attrs.get("is_trashed"):
+            # Has been deleted
+            new_name = None
+
+        if new_name != old_name:
+            ent = None
+            if old_name in self._entries:
+                ent = self._entries[old_name]
+                del self._entries[old_name]
+                self.inodes.invalidate_entry(self, old_name)
+
+            if new_name:
+                if ent is not None:
+                    self._entries[new_name] = ent
+                else:
+                    self._add_entry(new_attrs, new_name)
+            elif ent is not None:
+                self.inodes.del_entry(ent)
 
 
 class SharedDirectory(Directory):
@@ -916,3 +1086,6 @@ class SharedDirectory(Directory):
                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
         except Exception:
             _logger.exception()
+
+    def want_event_subscribe(self):
+        return True