1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 """FUSE driver for Arvados Keep
9 There is one `Operations` object per mount point. It is the entry point for all
10 read and write requests from the llfuse module.
12 The operations object owns an `Inodes` object. The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
16 The `Inodes` object owns an `InodeCache` object. The inode cache records the
17 memory footprint of file system objects and when they are last used. When the
18 cache limit is exceeded, the least recently used objects are cleared.
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
22 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
25 Directory objects inherit from `fusedir.Directory`. The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`. These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
32 The general FUSE operation flow is as follows:
34 - The request handler is called with either an inode or file handle that is the
35 subject of the operation.
37 - Look up the inode using the Inodes table or the file handle in the
38 filehandles table to get the file system object.
40 - For methods that alter files or directories, check that the operation is
41 valid and permitted using _check_writable().
43 - Call the relevant method on the file system object.
47 The FUSE driver supports the Arvados event bus. When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
52 from __future__ import absolute_import
53 from __future__ import division
54 from future.utils import viewitems
55 from future.utils import native
56 from future.utils import listvalues
57 from future.utils import listitems
58 from future import standard_library
59 standard_library.install_aliases()
60 from builtins import next
61 from builtins import str
62 from builtins import object
77 from prometheus_client import Summary
80 # Default _notify_queue has a limit of 1000 items, but it really needs to be
81 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
84 if hasattr(llfuse, 'capi'):
86 llfuse.capi._notify_queue = queue.Queue()
89 llfuse._notify_queue = queue.Queue()
91 LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
93 from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
94 from .fusefile import StringFile, FuseArvadosFile
96 _logger = logging.getLogger('arvados.arvados_fuse')
98 # Uncomment this to enable llfuse debug logging.
99 # log_handler = logging.StreamHandler()
100 # llogger = logging.getLogger('llfuse')
101 # llogger.addHandler(log_handler)
102 # llogger.setLevel(logging.DEBUG)
104 class Handle(object):
105 """Connects a numeric file handle to a File or Directory object that has
106 been opened by the client."""
108 def __init__(self, fh, obj):
120 class FileHandle(Handle):
121 """Connects a numeric file handle to a File object that has
122 been opened by the client."""
125 if self.obj.writable():
126 return self.obj.flush()
129 class DirectoryHandle(Handle):
130 """Connects a numeric file handle to a Directory object that has
131 been opened by the client."""
133 def __init__(self, fh, dirobj, entries):
134 super(DirectoryHandle, self).__init__(fh, dirobj)
135 self.entries = entries
138 class InodeCache(object):
139 """Records the memory footprint of objects and when they are last used.
141 When the cache limit is exceeded, the least recently used objects are
142 cleared. Clearing the object means discarding its contents to release
143 memory. The next time the object is accessed, it must be re-fetched from
144 the server. Note that the inode cache limit is a soft limit; the cache
145 limit may be exceeded if necessary to load very large objects, it may also
146 be exceeded if open file handles prevent objects from being cleared.
150 def __init__(self, cap, min_entries=4):
151 self._entries = collections.OrderedDict()
155 self.min_entries = min_entries
160 def _remove(self, obj, clear):
162 # Kernel behavior seems to be that if a file is
163 # referenced, its parents remain referenced too. This
164 # means has_ref() exits early when a collection is not
165 # candidate for eviction.
167 # By contrast, in_use() doesn't increment references on
168 # parents, so it requires a full tree walk to determine if
169 # a collection is a candidate for eviction. This takes
170 # .07s for 240000 files, which becomes a major drag when
171 # cap_cache is being called several times a second and
172 # there are multiple non-evictable collections in the
175 # So it is important for performance that we do the
176 # has_ref() check first.
178 if obj.has_ref(True):
179 _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
183 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
186 obj.kernel_invalidate()
187 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
190 # The llfuse lock is released in del_entry(), which is called by
191 # Directory.clear(). While the llfuse lock is released, it can happen
192 # that a reentrant call removes this entry before this call gets to it.
193 # Ensure that the entry is still valid before trying to remove it.
194 if obj.inode not in self._entries:
197 self._total -= obj.cache_size
198 del self._entries[obj.inode]
200 self._by_uuid[obj.cache_uuid].remove(obj)
201 if not self._by_uuid[obj.cache_uuid]:
202 del self._by_uuid[obj.cache_uuid]
203 obj.cache_uuid = None
205 _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
208 if self._total > self.cap:
209 for ent in listvalues(self._entries):
210 if self._total < self.cap or len(self._entries) < self.min_entries:
212 self._remove(ent, True)
214 def manage(self, obj):
216 obj.cache_size = obj.objsize()
217 self._entries[obj.inode] = obj
218 obj.cache_uuid = obj.uuid()
220 if obj.cache_uuid not in self._by_uuid:
221 self._by_uuid[obj.cache_uuid] = [obj]
223 if obj not in self._by_uuid[obj.cache_uuid]:
224 self._by_uuid[obj.cache_uuid].append(obj)
225 self._total += obj.objsize()
226 _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
227 obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
230 def touch(self, obj):
232 if obj.inode in self._entries:
233 self._remove(obj, False)
236 def unmanage(self, obj):
237 if obj.persisted() and obj.inode in self._entries:
238 self._remove(obj, True)
240 def find_by_uuid(self, uuid):
241 return self._by_uuid.get(uuid, [])
244 self._entries.clear()
245 self._by_uuid.clear()
248 class Inodes(object):
249 """Manage the set of inodes. This is the mapping from a numeric id
250 to a concrete File or Directory object"""
252 def __init__(self, inode_cache, encoding="utf-8"):
254 self._counter = itertools.count(llfuse.ROOT_INODE)
255 self.inode_cache = inode_cache
256 self.encoding = encoding
257 self.deferred_invalidations = []
259 def __getitem__(self, item):
260 return self._entries[item]
262 def __setitem__(self, key, item):
263 self._entries[key] = item
266 return iter(self._entries.keys())
269 return viewitems(self._entries.items())
271 def __contains__(self, k):
272 return k in self._entries
274 def touch(self, entry):
275 entry._atime = time.time()
276 self.inode_cache.touch(entry)
278 def add_entry(self, entry):
279 entry.inode = next(self._counter)
280 if entry.inode == llfuse.ROOT_INODE:
282 self._entries[entry.inode] = entry
283 self.inode_cache.manage(entry)
286 def del_entry(self, entry):
287 if entry.ref_count == 0:
288 self.inode_cache.unmanage(entry)
289 del self._entries[entry.inode]
290 with llfuse.lock_released:
295 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
297 def invalidate_inode(self, entry):
298 if entry.has_ref(False):
299 # Only necessary if the kernel has previously done a lookup on this
300 # inode and hasn't yet forgotten about it.
301 llfuse.invalidate_inode(entry.inode)
303 def invalidate_entry(self, entry, name):
304 if entry.has_ref(False):
305 # Only necessary if the kernel has previously done a lookup on this
306 # inode and hasn't yet forgotten about it.
307 llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
310 self.inode_cache.clear()
312 for k,v in viewitems(self._entries):
315 except Exception as e:
316 _logger.exception("Error during finalize of inode %i", k)
318 self._entries.clear()
321 def catch_exceptions(orig_func):
322 """Catch uncaught exceptions and log them consistently."""
324 @functools.wraps(orig_func)
325 def catch_exceptions_wrapper(self, *args, **kwargs):
327 return orig_func(self, *args, **kwargs)
328 except llfuse.FUSEError:
330 except EnvironmentError as e:
331 raise llfuse.FUSEError(e.errno)
332 except NotImplementedError:
333 raise llfuse.FUSEError(errno.ENOTSUP)
334 except arvados.errors.KeepWriteError as e:
335 _logger.error("Keep write error: " + str(e))
336 raise llfuse.FUSEError(errno.EIO)
337 except arvados.errors.NotFoundError as e:
338 _logger.error("Block not found error: " + str(e))
339 raise llfuse.FUSEError(errno.EIO)
341 _logger.exception("Unhandled exception during FUSE operation")
342 raise llfuse.FUSEError(errno.EIO)
344 return catch_exceptions_wrapper
347 class Operations(llfuse.Operations):
348 """This is the main interface with llfuse.
350 The methods on this object are called by llfuse threads to service FUSE
351 events to query and read from the file system.
353 llfuse has its own global lock which is acquired before calling a request handler,
354 so request handlers do not run concurrently unless the lock is explicitly released
355 using 'with llfuse.lock_released:'
359 fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
360 read_time = fuse_time.labels(op='read')
361 write_time = fuse_time.labels(op='write')
362 destroy_time = fuse_time.labels(op='destroy')
363 on_event_time = fuse_time.labels(op='on_event')
364 getattr_time = fuse_time.labels(op='getattr')
365 setattr_time = fuse_time.labels(op='setattr')
366 lookup_time = fuse_time.labels(op='lookup')
367 forget_time = fuse_time.labels(op='forget')
368 open_time = fuse_time.labels(op='open')
369 release_time = fuse_time.labels(op='release')
370 opendir_time = fuse_time.labels(op='opendir')
371 readdir_time = fuse_time.labels(op='readdir')
372 statfs_time = fuse_time.labels(op='statfs')
373 create_time = fuse_time.labels(op='create')
374 mkdir_time = fuse_time.labels(op='mkdir')
375 unlink_time = fuse_time.labels(op='unlink')
376 rmdir_time = fuse_time.labels(op='rmdir')
377 rename_time = fuse_time.labels(op='rename')
378 flush_time = fuse_time.labels(op='flush')
380 def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
381 super(Operations, self).__init__()
383 self._api_client = api_client
386 inode_cache = InodeCache(cap=256*1024*1024)
387 self.inodes = Inodes(inode_cache, encoding=encoding)
390 self.enable_write = enable_write
392 # dict of inode to filehandle
393 self._filehandles = {}
394 self._filehandles_counter = itertools.count(0)
396 # Other threads that need to wait until the fuse driver
397 # is fully initialized should wait() on this event object.
398 self.initlock = threading.Event()
400 # If we get overlapping shutdown events (e.g., fusermount -u
401 # -z and operations.destroy()) llfuse calls forget() on inodes
402 # that have already been deleted. To avoid this, we make
403 # forget() a no-op if called after destroy().
404 self._shutdown_started = threading.Event()
406 self.num_retries = num_retries
408 self.read_counter = arvados.keep.Counter()
409 self.write_counter = arvados.keep.Counter()
410 self.read_ops_counter = arvados.keep.Counter()
411 self.write_ops_counter = arvados.keep.Counter()
416 # Allow threads that are waiting for the driver to be finished
417 # initializing to continue
420 def metric_samples(self):
421 return self.fuse_time.collect()[0].samples
423 def metric_op_names(self):
425 for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
426 if cur_op not in ops:
430 def metric_value(self, opname, metric):
431 op_value = [sample.value for sample in self.metric_samples()
432 if sample.name == metric and sample.labels['op'] == opname]
433 return op_value[0] if len(op_value) == 1 else None
435 def metric_sum_func(self, opname):
436 return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
438 def metric_count_func(self, opname):
439 return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
444 self._shutdown_started.set()
449 # Different versions of llfuse require and forbid us to
450 # acquire the lock here. See #8345#note-37, #10805#note-9.
451 if LLFUSE_VERSION_0 and llfuse.lock.acquire():
454 llfuse.lock.release()
459 def access(self, inode, mode, ctx):
462 def listen_for_events(self):
463 self.events = arvados.events.subscribe(
465 [["event_type", "in", ["create", "update", "delete"]]],
468 @on_event_time.time()
470 def on_event(self, ev):
471 if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
474 properties = ev.get("properties") or {}
475 old_attrs = properties.get("old_attributes") or {}
476 new_attrs = properties.get("new_attributes") or {}
478 for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
481 oldowner = old_attrs.get("owner_uuid")
482 newowner = ev.get("object_owner_uuid")
484 self.inodes.inode_cache.find_by_uuid(oldowner) +
485 self.inodes.inode_cache.find_by_uuid(newowner)):
490 def getattr(self, inode, ctx=None):
491 if inode not in self.inodes:
492 raise llfuse.FUSEError(errno.ENOENT)
494 e = self.inodes[inode]
496 entry = llfuse.EntryAttributes()
499 entry.entry_timeout = 0
500 entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
502 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
503 if isinstance(e, Directory):
504 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
506 entry.st_mode |= stat.S_IFREG
507 if isinstance(e, FuseArvadosFile):
508 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
510 if self.enable_write and e.writable():
511 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
514 entry.st_uid = self.uid
515 entry.st_gid = self.gid
518 entry.st_size = e.size()
520 entry.st_blksize = 512
521 entry.st_blocks = (entry.st_size // 512) + 1
522 if hasattr(entry, 'st_atime_ns'):
524 entry.st_atime_ns = int(e.atime() * 1000000000)
525 entry.st_mtime_ns = int(e.mtime() * 1000000000)
526 entry.st_ctime_ns = int(e.mtime() * 1000000000)
529 entry.st_atime = int(e.atime)
530 entry.st_mtime = int(e.mtime)
531 entry.st_ctime = int(e.mtime)
537 def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
538 entry = self.getattr(inode)
540 if fh is not None and fh in self._filehandles:
541 handle = self._filehandles[fh]
544 e = self.inodes[inode]
548 update_size = attr.st_size is not None
551 update_size = fields.update_size
552 if update_size and isinstance(e, FuseArvadosFile):
553 with llfuse.lock_released:
554 e.arvfile.truncate(attr.st_size)
555 entry.st_size = e.arvfile.size()
561 def lookup(self, parent_inode, name, ctx=None):
562 name = str(name, self.inodes.encoding)
568 if parent_inode in self.inodes:
569 p = self.inodes[parent_inode]
572 inode = p.parent_inode
573 elif isinstance(p, Directory) and name in p:
574 inode = p[name].inode
577 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
578 parent_inode, name, inode)
579 self.inodes[inode].inc_ref()
580 return self.getattr(inode)
582 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
584 raise llfuse.FUSEError(errno.ENOENT)
588 def forget(self, inodes):
589 if self._shutdown_started.is_set():
591 for inode, nlookup in inodes:
592 ent = self.inodes[inode]
593 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
594 if ent.dec_ref(nlookup) == 0 and ent.dead:
595 self.inodes.del_entry(ent)
599 def open(self, inode, flags, ctx=None):
600 if inode in self.inodes:
601 p = self.inodes[inode]
603 raise llfuse.FUSEError(errno.ENOENT)
605 if isinstance(p, Directory):
606 raise llfuse.FUSEError(errno.EISDIR)
608 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
609 raise llfuse.FUSEError(errno.EPERM)
611 fh = next(self._filehandles_counter)
612 self._filehandles[fh] = FileHandle(fh, p)
615 # Normally, we will have received an "update" event if the
616 # parent collection is stale here. However, even if the parent
617 # collection hasn't changed, the manifest might have been
618 # fetched so long ago that the signatures on the data block
619 # locators have expired. Calling checkupdate() on all
620 # ancestors ensures the signatures will be refreshed if
622 while p.parent_inode in self.inodes:
623 if p == self.inodes[p.parent_inode]:
625 p = self.inodes[p.parent_inode]
629 _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
635 def read(self, fh, off, size):
636 _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
637 self.read_ops_counter.add(1)
639 if fh in self._filehandles:
640 handle = self._filehandles[fh]
642 raise llfuse.FUSEError(errno.EBADF)
644 self.inodes.touch(handle.obj)
646 r = handle.obj.readfrom(off, size, self.num_retries)
648 self.read_counter.add(len(r))
653 def write(self, fh, off, buf):
654 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
655 self.write_ops_counter.add(1)
657 if fh in self._filehandles:
658 handle = self._filehandles[fh]
660 raise llfuse.FUSEError(errno.EBADF)
662 if not handle.obj.writable():
663 raise llfuse.FUSEError(errno.EPERM)
665 self.inodes.touch(handle.obj)
667 w = handle.obj.writeto(off, buf, self.num_retries)
669 self.write_counter.add(w)
674 def release(self, fh):
675 if fh in self._filehandles:
676 _logger.debug("arv-mount release fh %i", fh)
678 self._filehandles[fh].flush()
682 self._filehandles[fh].release()
683 del self._filehandles[fh]
684 self.inodes.inode_cache.cap_cache()
686 def releasedir(self, fh):
691 def opendir(self, inode, ctx=None):
692 _logger.debug("arv-mount opendir: inode %i", inode)
694 if inode in self.inodes:
695 p = self.inodes[inode]
697 raise llfuse.FUSEError(errno.ENOENT)
699 if not isinstance(p, Directory):
700 raise llfuse.FUSEError(errno.ENOTDIR)
702 fh = next(self._filehandles_counter)
703 if p.parent_inode in self.inodes:
704 parent = self.inodes[p.parent_inode]
706 raise llfuse.FUSEError(errno.EIO)
710 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p))
715 def readdir(self, fh, off):
716 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
718 if fh in self._filehandles:
719 handle = self._filehandles[fh]
721 raise llfuse.FUSEError(errno.EBADF)
724 while e < len(handle.entries):
725 if handle.entries[e][1].inode in self.inodes:
726 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
731 def statfs(self, ctx=None):
732 st = llfuse.StatvfsData()
733 st.f_bsize = 128 * 1024
746 def _check_writable(self, inode_parent):
747 if not self.enable_write:
748 raise llfuse.FUSEError(errno.EROFS)
750 if inode_parent in self.inodes:
751 p = self.inodes[inode_parent]
753 raise llfuse.FUSEError(errno.ENOENT)
755 if not isinstance(p, Directory):
756 raise llfuse.FUSEError(errno.ENOTDIR)
759 raise llfuse.FUSEError(errno.EPERM)
765 def create(self, inode_parent, name, mode, flags, ctx=None):
766 name = name.decode(encoding=self.inodes.encoding)
767 _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
769 p = self._check_writable(inode_parent)
772 # The file entry should have been implicitly created by callback.
774 fh = next(self._filehandles_counter)
775 self._filehandles[fh] = FileHandle(fh, f)
779 return (fh, self.getattr(f.inode))
783 def mkdir(self, inode_parent, name, mode, ctx=None):
784 name = name.decode(encoding=self.inodes.encoding)
785 _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
787 p = self._check_writable(inode_parent)
790 # The dir entry should have been implicitly created by callback.
794 return self.getattr(d.inode)
798 def unlink(self, inode_parent, name, ctx=None):
799 name = name.decode(encoding=self.inodes.encoding)
800 _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
801 p = self._check_writable(inode_parent)
806 def rmdir(self, inode_parent, name, ctx=None):
807 name = name.decode(encoding=self.inodes.encoding)
808 _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
809 p = self._check_writable(inode_parent)
814 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
815 name_old = name_old.decode(encoding=self.inodes.encoding)
816 name_new = name_new.decode(encoding=self.inodes.encoding)
817 _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
818 src = self._check_writable(inode_parent_old)
819 dest = self._check_writable(inode_parent_new)
820 dest.rename(name_old, name_new, src)
825 if fh in self._filehandles:
826 self._filehandles[fh].flush()
828 def fsync(self, fh, datasync):
831 def fsyncdir(self, fh, datasync):