Merge branch '16100-mime-types'
[arvados.git] / services / fuse / arvados_fuse / __init__.py
index 1bfd5177988af6c81adbf091236c187cf2a5d6bf..3a0316cf9e1e48cb319792c1636c56c8fc1eabc2 100644 (file)
@@ -49,6 +49,17 @@ an object that is live in the inode cache, that object is immediately updated.
 
 """
 
+from __future__ import absolute_import
+from __future__ import division
+from future.utils import viewitems
+from future.utils import native
+from future.utils import listvalues
+from future.utils import listitems
+from future import standard_library
+standard_library.install_aliases()
+from builtins import next
+from builtins import str
+from builtins import object
 import os
 import sys
 import llfuse
@@ -71,8 +82,8 @@ import ciso8601
 import collections
 import functools
 import arvados.keep
-
-import Queue
+from prometheus_client import Summary
+import queue
 
 # Default _notify_queue has a limit of 1000 items, but it really needs to be
 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
@@ -80,13 +91,15 @@ import Queue
 
 if hasattr(llfuse, 'capi'):
     # llfuse < 0.42
-    llfuse.capi._notify_queue = Queue.Queue()
+    llfuse.capi._notify_queue = queue.Queue()
 else:
     # llfuse >= 0.42
-    llfuse._notify_queue = Queue.Queue()
+    llfuse._notify_queue = queue.Queue()
+
+LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
 
-from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
-from fusefile import StringFile, FuseArvadosFile
+from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
+from .fusefile import StringFile, FuseArvadosFile
 
 _logger = logging.getLogger('arvados.arvados_fuse')
 
@@ -154,13 +167,32 @@ class InodeCache(object):
 
     def _remove(self, obj, clear):
         if clear:
+            # Kernel behavior seems to be that if a file is
+            # referenced, its parents remain referenced too. This
+            # means has_ref() exits early when a collection is not
+            # candidate for eviction.
+            #
+            # By contrast, in_use() doesn't increment references on
+            # parents, so it requires a full tree walk to determine if
+            # a collection is a candidate for eviction.  This takes
+            # .07s for 240000 files, which becomes a major drag when
+            # cap_cache is being called several times a second and
+            # there are multiple non-evictable collections in the
+            # cache.
+            #
+            # So it is important for performance that we do the
+            # has_ref() check first.
+
+            if obj.has_ref(True):
+                _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
+                return
+
             if obj.in_use():
                 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
                 return
-            if obj.has_ref(True):
-                obj.kernel_invalidate()
-                _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
-                return
+
+            obj.kernel_invalidate()
+            _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
             obj.clear()
 
         # The llfuse lock is released in del_entry(), which is called by
@@ -182,7 +214,7 @@ class InodeCache(object):
 
     def cap_cache(self):
         if self._total > self.cap:
-            for ent in self._entries.values():
+            for ent in listvalues(self._entries):
                 if self._total < self.cap or len(self._entries) < self.min_entries:
                     break
                 self._remove(ent, True)
@@ -199,7 +231,8 @@ class InodeCache(object):
                     if obj not in self._by_uuid[obj.cache_uuid]:
                         self._by_uuid[obj.cache_uuid].append(obj)
             self._total += obj.objsize()
-            _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
+            _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
+                          obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
             self.cap_cache()
 
     def touch(self, obj):
@@ -238,10 +271,10 @@ class Inodes(object):
         self._entries[key] = item
 
     def __iter__(self):
-        return self._entries.iterkeys()
+        return iter(self._entries.keys())
 
     def items(self):
-        return self._entries.items()
+        return viewitems(self._entries.items())
 
     def __contains__(self, k):
         return k in self._entries
@@ -264,22 +297,27 @@ class Inodes(object):
             del self._entries[entry.inode]
             with llfuse.lock_released:
                 entry.finalize()
-            self.invalidate_inode(entry.inode)
             entry.inode = None
         else:
             entry.dead = True
             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
 
-    def invalidate_inode(self, inode):
-        llfuse.invalidate_inode(inode)
+    def invalidate_inode(self, entry):
+        if entry.has_ref(False):
+            # Only necessary if the kernel has previously done a lookup on this
+            # inode and hasn't yet forgotten about it.
+            llfuse.invalidate_inode(entry.inode)
 
-    def invalidate_entry(self, inode, name):
-        llfuse.invalidate_entry(inode, name.encode(self.encoding))
+    def invalidate_entry(self, entry, name):
+        if entry.has_ref(False):
+            # Only necessary if the kernel has previously done a lookup on this
+            # inode and hasn't yet forgotten about it.
+            llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
 
     def clear(self):
         self.inode_cache.clear()
 
-        for k,v in self._entries.items():
+        for k,v in viewitems(self._entries):
             try:
                 v.finalize()
             except Exception as e:
@@ -324,6 +362,27 @@ class Operations(llfuse.Operations):
 
     """
 
+    fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
+    read_time = fuse_time.labels(op='read')
+    write_time = fuse_time.labels(op='write')
+    destroy_time = fuse_time.labels(op='destroy')
+    on_event_time = fuse_time.labels(op='on_event')
+    getattr_time = fuse_time.labels(op='getattr')
+    setattr_time = fuse_time.labels(op='setattr')
+    lookup_time = fuse_time.labels(op='lookup')
+    forget_time = fuse_time.labels(op='forget')
+    open_time = fuse_time.labels(op='open')
+    release_time = fuse_time.labels(op='release')
+    opendir_time = fuse_time.labels(op='opendir')
+    readdir_time = fuse_time.labels(op='readdir')
+    statfs_time = fuse_time.labels(op='statfs')
+    create_time = fuse_time.labels(op='create')
+    mkdir_time = fuse_time.labels(op='mkdir')
+    unlink_time = fuse_time.labels(op='unlink')
+    rmdir_time = fuse_time.labels(op='rmdir')
+    rename_time = fuse_time.labels(op='rename')
+    flush_time = fuse_time.labels(op='flush')
+
     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
         super(Operations, self).__init__()
 
@@ -364,6 +423,28 @@ class Operations(llfuse.Operations):
         # initializing to continue
         self.initlock.set()
 
+    def metric_samples(self):
+        return self.fuse_time.collect()[0].samples
+
+    def metric_op_names(self):
+        ops = []
+        for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
+            if cur_op not in ops:
+                ops.append(cur_op)
+        return ops
+
+    def metric_value(self, opname, metric):
+        op_value = [sample.value for sample in self.metric_samples()
+                    if sample.name == metric and sample.labels['op'] == opname]
+        return op_value[0] if len(op_value) == 1 else None
+
+    def metric_sum_func(self, opname):
+        return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
+
+    def metric_count_func(self, opname):
+        return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
+
+    @destroy_time.time()
     @catch_exceptions
     def destroy(self):
         self._shutdown_started.set()
@@ -371,7 +452,9 @@ class Operations(llfuse.Operations):
             self.events.close()
             self.events = None
 
-        if llfuse.lock.acquire():
+        # Different versions of llfuse require and forbid us to
+        # acquire the lock here. See #8345#note-37, #10805#note-9.
+        if LLFUSE_VERSION_0 and llfuse.lock.acquire():
             # llfuse < 0.42
             self.inodes.clear()
             llfuse.lock.release()
@@ -388,33 +471,38 @@ class Operations(llfuse.Operations):
             [["event_type", "in", ["create", "update", "delete"]]],
             self.on_event)
 
+    @on_event_time.time()
     @catch_exceptions
     def on_event(self, ev):
-        if 'event_type' not in ev:
+        if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
             return
         with llfuse.lock:
-            new_attrs = (ev.get("properties") or {}).get("new_attributes") or {}
-            pdh = new_attrs.get("portable_data_hash")
-            # new_attributes.modified_at currently lacks
-            # subsecond precision (see #6347) so use event_at
-            # which should always be the same.
-            stamp = ev.get("event_at")
+            properties = ev.get("properties") or {}
+            old_attrs = properties.get("old_attributes") or {}
+            new_attrs = properties.get("new_attributes") or {}
 
             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
                 item.invalidate()
-                if stamp and pdh and ev.get("object_kind") == "arvados#collection":
-                    item.update(to_record_version=(stamp, pdh))
-                else:
-                    item.update()
-
-            oldowner = ((ev.get("properties") or {}).get("old_attributes") or {}).get("owner_uuid")
+                if ev.get("object_kind") == "arvados#collection":
+                    pdh = new_attrs.get("portable_data_hash")
+                    # new_attributes.modified_at currently lacks
+                    # subsecond precision (see #6347) so use event_at
+                    # which should always be the same.
+                    stamp = ev.get("event_at")
+                    if (stamp and pdh and item.writable() and
+                        item.collection is not None and
+                        item.collection.modified() and
+                        new_attrs.get("is_trashed") is not True):
+                        item.update(to_record_version=(stamp, pdh))
+
+            oldowner = old_attrs.get("owner_uuid")
             newowner = ev.get("object_owner_uuid")
             for parent in (
                     self.inodes.inode_cache.find_by_uuid(oldowner) +
                     self.inodes.inode_cache.find_by_uuid(newowner)):
-                parent.invalidate()
-                parent.update()
+                parent.child_event(ev)
 
+    @getattr_time.time()
     @catch_exceptions
     def getattr(self, inode, ctx=None):
         if inode not in self.inodes:
@@ -425,8 +513,8 @@ class Operations(llfuse.Operations):
         entry = llfuse.EntryAttributes()
         entry.st_ino = inode
         entry.generation = 0
-        entry.entry_timeout = 60 if e.allow_dirent_cache else 0
-        entry.attr_timeout = 60 if e.allow_attr_cache else 0
+        entry.entry_timeout = 0
+        entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
 
         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
         if isinstance(e, Directory):
@@ -447,7 +535,7 @@ class Operations(llfuse.Operations):
         entry.st_size = e.size()
 
         entry.st_blksize = 512
-        entry.st_blocks = (entry.st_size/512)+1
+        entry.st_blocks = (entry.st_size // 512) + 1
         if hasattr(entry, 'st_atime_ns'):
             # llfuse >= 0.42
             entry.st_atime_ns = int(e.atime() * 1000000000)
@@ -461,6 +549,7 @@ class Operations(llfuse.Operations):
 
         return entry
 
+    @setattr_time.time()
     @catch_exceptions
     def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
         entry = self.getattr(inode)
@@ -484,9 +573,10 @@ class Operations(llfuse.Operations):
 
         return entry
 
+    @lookup_time.time()
     @catch_exceptions
     def lookup(self, parent_inode, name, ctx=None):
-        name = unicode(name, self.inodes.encoding)
+        name = str(name, self.inodes.encoding)
         inode = None
 
         if name == '.':
@@ -510,6 +600,7 @@ class Operations(llfuse.Operations):
                       parent_inode, name)
             raise llfuse.FUSEError(errno.ENOENT)
 
+    @forget_time.time()
     @catch_exceptions
     def forget(self, inodes):
         if self._shutdown_started.is_set():
@@ -520,6 +611,7 @@ class Operations(llfuse.Operations):
             if ent.dec_ref(nlookup) == 0 and ent.dead:
                 self.inodes.del_entry(ent)
 
+    @open_time.time()
     @catch_exceptions
     def open(self, inode, flags, ctx=None):
         if inode in self.inodes:
@@ -555,6 +647,7 @@ class Operations(llfuse.Operations):
 
         return fh
 
+    @read_time.time()
     @catch_exceptions
     def read(self, fh, off, size):
         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
@@ -572,6 +665,7 @@ class Operations(llfuse.Operations):
             self.read_counter.add(len(r))
         return r
 
+    @write_time.time()
     @catch_exceptions
     def write(self, fh, off, buf):
         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
@@ -592,6 +686,7 @@ class Operations(llfuse.Operations):
             self.write_counter.add(w)
         return w
 
+    @release_time.time()
     @catch_exceptions
     def release(self, fh):
         if fh in self._filehandles:
@@ -608,6 +703,7 @@ class Operations(llfuse.Operations):
     def releasedir(self, fh):
         self.release(fh)
 
+    @opendir_time.time()
     @catch_exceptions
     def opendir(self, inode, ctx=None):
         _logger.debug("arv-mount opendir: inode %i", inode)
@@ -628,10 +724,10 @@ class Operations(llfuse.Operations):
 
         # update atime
         self.inodes.touch(p)
-
-        self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
+        self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p))
         return fh
 
+    @readdir_time.time()
     @catch_exceptions
     def readdir(self, fh, off):
         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
@@ -647,6 +743,7 @@ class Operations(llfuse.Operations):
                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
             e += 1
 
+    @statfs_time.time()
     @catch_exceptions
     def statfs(self, ctx=None):
         st = llfuse.StatvfsData()
@@ -680,8 +777,10 @@ class Operations(llfuse.Operations):
 
         return p
 
+    @create_time.time()
     @catch_exceptions
     def create(self, inode_parent, name, mode, flags, ctx=None):
+        name = name.decode(encoding=self.inodes.encoding)
         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
 
         p = self._check_writable(inode_parent)
@@ -696,8 +795,10 @@ class Operations(llfuse.Operations):
         f.inc_ref()
         return (fh, self.getattr(f.inode))
 
+    @mkdir_time.time()
     @catch_exceptions
     def mkdir(self, inode_parent, name, mode, ctx=None):
+        name = name.decode(encoding=self.inodes.encoding)
         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
 
         p = self._check_writable(inode_parent)
@@ -709,25 +810,33 @@ class Operations(llfuse.Operations):
         d.inc_ref()
         return self.getattr(d.inode)
 
+    @unlink_time.time()
     @catch_exceptions
     def unlink(self, inode_parent, name, ctx=None):
+        name = name.decode(encoding=self.inodes.encoding)
         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
         p = self._check_writable(inode_parent)
         p.unlink(name)
 
+    @rmdir_time.time()
     @catch_exceptions
     def rmdir(self, inode_parent, name, ctx=None):
+        name = name.decode(encoding=self.inodes.encoding)
         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
         p = self._check_writable(inode_parent)
         p.rmdir(name)
 
+    @rename_time.time()
     @catch_exceptions
     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
+        name_old = name_old.decode(encoding=self.inodes.encoding)
+        name_new = name_new.decode(encoding=self.inodes.encoding)
         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
         src = self._check_writable(inode_parent_old)
         dest = self._check_writable(inode_parent_new)
         dest.rename(name_old, name_new, src)
 
+    @flush_time.time()
     @catch_exceptions
     def flush(self, fh):
         if fh in self._filehandles: