1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 """FUSE driver for Arvados Keep
9 There is one `Operations` object per mount point. It is the entry point for all
10 read and write requests from the llfuse module.
12 The operations object owns an `Inodes` object. The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
16 The `Inodes` object owns an `InodeCache` object. The inode cache records the
17 memory footprint of file system objects and when they are last used. When the
18 cache limit is exceeded, the least recently used objects are cleared.
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
22 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
25 Directory objects inherit from `fusedir.Directory`. The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`. These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
32 The general FUSE operation flow is as follows:
34 - The request handler is called with either an inode or file handle that is the
35 subject of the operation.
37 - Look up the inode using the Inodes table or the file handle in the
38 filehandles table to get the file system object.
40 - For methods that alter files or directories, check that the operation is
41 valid and permitted using _check_writable().
43 - Call the relevant method on the file system object.
47 The FUSE driver supports the Arvados event bus. When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
52 from __future__ import absolute_import
53 from __future__ import division
54 from future.utils import viewitems
55 from future.utils import native
56 from future.utils import listvalues
57 from future.utils import listitems
58 from future import standard_library
59 standard_library.install_aliases()
60 from builtins import next
61 from builtins import str
62 from builtins import object
77 from prometheus_client import Summary
80 # Default _notify_queue has a limit of 1000 items, but it really needs to be
81 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
84 if hasattr(llfuse, 'capi'):
86 llfuse.capi._notify_queue = queue.Queue()
89 llfuse._notify_queue = queue.Queue()
91 LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
93 from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
94 from .fusefile import StringFile, FuseArvadosFile
96 _logger = logging.getLogger('arvados.arvados_fuse')
98 # Uncomment this to enable llfuse debug logging.
99 # log_handler = logging.StreamHandler()
100 # llogger = logging.getLogger('llfuse')
101 # llogger.addHandler(log_handler)
102 # llogger.setLevel(logging.DEBUG)
104 class Handle(object):
105 """Connects a numeric file handle to a File or Directory object that has
106 been opened by the client."""
108 def __init__(self, fh, obj):
120 class FileHandle(Handle):
121 """Connects a numeric file handle to a File object that has
122 been opened by the client."""
125 if self.obj.writable():
126 return self.obj.flush()
129 class DirectoryHandle(Handle):
130 """Connects a numeric file handle to a Directory object that has
131 been opened by the client."""
133 def __init__(self, fh, dirobj, entries):
134 super(DirectoryHandle, self).__init__(fh, dirobj)
135 self.entries = entries
138 class InodeCache(object):
139 """Records the memory footprint of objects and when they are last used.
141 When the cache limit is exceeded, the least recently used objects are
142 cleared. Clearing the object means discarding its contents to release
143 memory. The next time the object is accessed, it must be re-fetched from
144 the server. Note that the inode cache limit is a soft limit; the cache
145 limit may be exceeded if necessary to load very large objects, it may also
146 be exceeded if open file handles prevent objects from being cleared.
150 def __init__(self, cap, min_entries=4):
151 self._entries = collections.OrderedDict()
155 self.min_entries = min_entries
160 def _remove(self, obj, clear):
162 # Kernel behavior seems to be that if a file is
163 # referenced, its parents remain referenced too. This
164 # means has_ref() exits early when a collection is not
165 # candidate for eviction.
167 # By contrast, in_use() doesn't increment references on
168 # parents, so it requires a full tree walk to determine if
169 # a collection is a candidate for eviction. This takes
170 # .07s for 240000 files, which becomes a major drag when
171 # cap_cache is being called several times a second and
172 # there are multiple non-evictable collections in the
175 # So it is important for performance that we do the
176 # has_ref() check first.
178 if obj.has_ref(True):
179 _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
183 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
186 obj.kernel_invalidate()
187 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
190 # The llfuse lock is released in del_entry(), which is called by
191 # Directory.clear(). While the llfuse lock is released, it can happen
192 # that a reentrant call removes this entry before this call gets to it.
193 # Ensure that the entry is still valid before trying to remove it.
194 if obj.inode not in self._entries:
197 self._total -= obj.cache_size
198 del self._entries[obj.inode]
200 self._by_uuid[obj.cache_uuid].remove(obj)
201 if not self._by_uuid[obj.cache_uuid]:
202 del self._by_uuid[obj.cache_uuid]
203 obj.cache_uuid = None
205 _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
208 if self._total > self.cap:
209 for ent in listvalues(self._entries):
210 if self._total < self.cap or len(self._entries) < self.min_entries:
212 self._remove(ent, True)
214 def manage(self, obj):
216 obj.cache_size = obj.objsize()
217 self._entries[obj.inode] = obj
218 obj.cache_uuid = obj.uuid()
220 if obj.cache_uuid not in self._by_uuid:
221 self._by_uuid[obj.cache_uuid] = [obj]
223 if obj not in self._by_uuid[obj.cache_uuid]:
224 self._by_uuid[obj.cache_uuid].append(obj)
225 self._total += obj.objsize()
226 _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
227 obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
230 def touch(self, obj):
232 if obj.inode in self._entries:
233 self._remove(obj, False)
236 def unmanage(self, obj):
237 if obj.persisted() and obj.inode in self._entries:
238 self._remove(obj, True)
240 def find_by_uuid(self, uuid):
241 return self._by_uuid.get(uuid, [])
244 self._entries.clear()
245 self._by_uuid.clear()
248 class Inodes(object):
249 """Manage the set of inodes. This is the mapping from a numeric id
250 to a concrete File or Directory object"""
252 def __init__(self, inode_cache, encoding="utf-8"):
254 self._counter = itertools.count(llfuse.ROOT_INODE)
255 self.inode_cache = inode_cache
256 self.encoding = encoding
257 self.deferred_invalidations = []
259 def __getitem__(self, item):
260 return self._entries[item]
262 def __setitem__(self, key, item):
263 self._entries[key] = item
266 return iter(self._entries.keys())
269 return viewitems(self._entries.items())
271 def __contains__(self, k):
272 return k in self._entries
274 def touch(self, entry):
275 entry._atime = time.time()
276 self.inode_cache.touch(entry)
278 def add_entry(self, entry):
279 entry.inode = next(self._counter)
280 if entry.inode == llfuse.ROOT_INODE:
282 self._entries[entry.inode] = entry
283 self.inode_cache.manage(entry)
286 def del_entry(self, entry):
287 if entry.ref_count == 0:
288 self.inode_cache.unmanage(entry)
289 del self._entries[entry.inode]
290 with llfuse.lock_released:
295 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
297 def invalidate_inode(self, entry):
298 if entry.has_ref(False):
299 # Only necessary if the kernel has previously done a lookup on this
300 # inode and hasn't yet forgotten about it.
301 llfuse.invalidate_inode(entry.inode)
303 def invalidate_entry(self, entry, name):
304 if entry.has_ref(False):
305 # Only necessary if the kernel has previously done a lookup on this
306 # inode and hasn't yet forgotten about it.
307 llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
310 self.inode_cache.clear()
312 for k,v in viewitems(self._entries):
315 except Exception as e:
316 _logger.exception("Error during finalize of inode %i", k)
318 self._entries.clear()
321 def catch_exceptions(orig_func):
322 """Catch uncaught exceptions and log them consistently."""
324 @functools.wraps(orig_func)
325 def catch_exceptions_wrapper(self, *args, **kwargs):
327 return orig_func(self, *args, **kwargs)
328 except llfuse.FUSEError:
330 except EnvironmentError as e:
331 raise llfuse.FUSEError(e.errno)
332 except arvados.errors.KeepWriteError as e:
333 _logger.error("Keep write error: " + str(e))
334 raise llfuse.FUSEError(errno.EIO)
335 except arvados.errors.NotFoundError as e:
336 _logger.error("Block not found error: " + str(e))
337 raise llfuse.FUSEError(errno.EIO)
339 _logger.exception("Unhandled exception during FUSE operation")
340 raise llfuse.FUSEError(errno.EIO)
342 return catch_exceptions_wrapper
345 class Operations(llfuse.Operations):
346 """This is the main interface with llfuse.
348 The methods on this object are called by llfuse threads to service FUSE
349 events to query and read from the file system.
351 llfuse has its own global lock which is acquired before calling a request handler,
352 so request handlers do not run concurrently unless the lock is explicitly released
353 using 'with llfuse.lock_released:'
357 fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
358 read_time = fuse_time.labels(op='read')
359 write_time = fuse_time.labels(op='write')
360 destroy_time = fuse_time.labels(op='destroy')
361 on_event_time = fuse_time.labels(op='on_event')
362 getattr_time = fuse_time.labels(op='getattr')
363 setattr_time = fuse_time.labels(op='setattr')
364 lookup_time = fuse_time.labels(op='lookup')
365 forget_time = fuse_time.labels(op='forget')
366 open_time = fuse_time.labels(op='open')
367 release_time = fuse_time.labels(op='release')
368 opendir_time = fuse_time.labels(op='opendir')
369 readdir_time = fuse_time.labels(op='readdir')
370 statfs_time = fuse_time.labels(op='statfs')
371 create_time = fuse_time.labels(op='create')
372 mkdir_time = fuse_time.labels(op='mkdir')
373 unlink_time = fuse_time.labels(op='unlink')
374 rmdir_time = fuse_time.labels(op='rmdir')
375 rename_time = fuse_time.labels(op='rename')
376 flush_time = fuse_time.labels(op='flush')
378 def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
379 super(Operations, self).__init__()
381 self._api_client = api_client
384 inode_cache = InodeCache(cap=256*1024*1024)
385 self.inodes = Inodes(inode_cache, encoding=encoding)
388 self.enable_write = enable_write
390 # dict of inode to filehandle
391 self._filehandles = {}
392 self._filehandles_counter = itertools.count(0)
394 # Other threads that need to wait until the fuse driver
395 # is fully initialized should wait() on this event object.
396 self.initlock = threading.Event()
398 # If we get overlapping shutdown events (e.g., fusermount -u
399 # -z and operations.destroy()) llfuse calls forget() on inodes
400 # that have already been deleted. To avoid this, we make
401 # forget() a no-op if called after destroy().
402 self._shutdown_started = threading.Event()
404 self.num_retries = num_retries
406 self.read_counter = arvados.keep.Counter()
407 self.write_counter = arvados.keep.Counter()
408 self.read_ops_counter = arvados.keep.Counter()
409 self.write_ops_counter = arvados.keep.Counter()
414 # Allow threads that are waiting for the driver to be finished
415 # initializing to continue
418 def metric_samples(self):
419 return self.fuse_time.collect()[0].samples
421 def metric_op_names(self):
423 for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
424 if cur_op not in ops:
428 def metric_value(self, opname, metric):
429 op_value = [sample.value for sample in self.metric_samples()
430 if sample.name == metric and sample.labels['op'] == opname]
431 return op_value[0] if len(op_value) == 1 else None
433 def metric_sum_func(self, opname):
434 return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
436 def metric_count_func(self, opname):
437 return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
442 self._shutdown_started.set()
447 # Different versions of llfuse require and forbid us to
448 # acquire the lock here. See #8345#note-37, #10805#note-9.
449 if LLFUSE_VERSION_0 and llfuse.lock.acquire():
452 llfuse.lock.release()
457 def access(self, inode, mode, ctx):
460 def listen_for_events(self):
461 self.events = arvados.events.subscribe(
463 [["event_type", "in", ["create", "update", "delete"]]],
466 @on_event_time.time()
468 def on_event(self, ev):
469 if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
472 properties = ev.get("properties") or {}
473 old_attrs = properties.get("old_attributes") or {}
474 new_attrs = properties.get("new_attributes") or {}
476 for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
478 if ev.get("object_kind") == "arvados#collection":
479 pdh = new_attrs.get("portable_data_hash")
480 # new_attributes.modified_at currently lacks
481 # subsecond precision (see #6347) so use event_at
482 # which should always be the same.
483 stamp = ev.get("event_at")
484 if (stamp and pdh and item.writable() and
485 item.collection is not None and
486 item.collection.modified() and
487 new_attrs.get("is_trashed") is not True):
488 item.update(to_record_version=(stamp, pdh))
490 oldowner = old_attrs.get("owner_uuid")
491 newowner = ev.get("object_owner_uuid")
493 self.inodes.inode_cache.find_by_uuid(oldowner) +
494 self.inodes.inode_cache.find_by_uuid(newowner)):
495 parent.child_event(ev)
499 def getattr(self, inode, ctx=None):
500 if inode not in self.inodes:
501 raise llfuse.FUSEError(errno.ENOENT)
503 e = self.inodes[inode]
505 entry = llfuse.EntryAttributes()
508 entry.entry_timeout = 0
509 entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
511 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
512 if isinstance(e, Directory):
513 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
515 entry.st_mode |= stat.S_IFREG
516 if isinstance(e, FuseArvadosFile):
517 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
519 if self.enable_write and e.writable():
520 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
523 entry.st_uid = self.uid
524 entry.st_gid = self.gid
527 entry.st_size = e.size()
529 entry.st_blksize = 512
530 entry.st_blocks = (entry.st_size // 512) + 1
531 if hasattr(entry, 'st_atime_ns'):
533 entry.st_atime_ns = int(e.atime() * 1000000000)
534 entry.st_mtime_ns = int(e.mtime() * 1000000000)
535 entry.st_ctime_ns = int(e.mtime() * 1000000000)
538 entry.st_atime = int(e.atime)
539 entry.st_mtime = int(e.mtime)
540 entry.st_ctime = int(e.mtime)
546 def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
547 entry = self.getattr(inode)
549 if fh is not None and fh in self._filehandles:
550 handle = self._filehandles[fh]
553 e = self.inodes[inode]
557 update_size = attr.st_size is not None
560 update_size = fields.update_size
561 if update_size and isinstance(e, FuseArvadosFile):
562 with llfuse.lock_released:
563 e.arvfile.truncate(attr.st_size)
564 entry.st_size = e.arvfile.size()
570 def lookup(self, parent_inode, name, ctx=None):
571 name = str(name, self.inodes.encoding)
577 if parent_inode in self.inodes:
578 p = self.inodes[parent_inode]
581 inode = p.parent_inode
582 elif isinstance(p, Directory) and name in p:
583 inode = p[name].inode
586 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
587 parent_inode, name, inode)
588 self.inodes[inode].inc_ref()
589 return self.getattr(inode)
591 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
593 raise llfuse.FUSEError(errno.ENOENT)
597 def forget(self, inodes):
598 if self._shutdown_started.is_set():
600 for inode, nlookup in inodes:
601 ent = self.inodes[inode]
602 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
603 if ent.dec_ref(nlookup) == 0 and ent.dead:
604 self.inodes.del_entry(ent)
608 def open(self, inode, flags, ctx=None):
609 if inode in self.inodes:
610 p = self.inodes[inode]
612 raise llfuse.FUSEError(errno.ENOENT)
614 if isinstance(p, Directory):
615 raise llfuse.FUSEError(errno.EISDIR)
617 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
618 raise llfuse.FUSEError(errno.EPERM)
620 fh = next(self._filehandles_counter)
621 self._filehandles[fh] = FileHandle(fh, p)
624 # Normally, we will have received an "update" event if the
625 # parent collection is stale here. However, even if the parent
626 # collection hasn't changed, the manifest might have been
627 # fetched so long ago that the signatures on the data block
628 # locators have expired. Calling checkupdate() on all
629 # ancestors ensures the signatures will be refreshed if
631 while p.parent_inode in self.inodes:
632 if p == self.inodes[p.parent_inode]:
634 p = self.inodes[p.parent_inode]
638 _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
644 def read(self, fh, off, size):
645 _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
646 self.read_ops_counter.add(1)
648 if fh in self._filehandles:
649 handle = self._filehandles[fh]
651 raise llfuse.FUSEError(errno.EBADF)
653 self.inodes.touch(handle.obj)
655 r = handle.obj.readfrom(off, size, self.num_retries)
657 self.read_counter.add(len(r))
662 def write(self, fh, off, buf):
663 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
664 self.write_ops_counter.add(1)
666 if fh in self._filehandles:
667 handle = self._filehandles[fh]
669 raise llfuse.FUSEError(errno.EBADF)
671 if not handle.obj.writable():
672 raise llfuse.FUSEError(errno.EPERM)
674 self.inodes.touch(handle.obj)
676 w = handle.obj.writeto(off, buf, self.num_retries)
678 self.write_counter.add(w)
683 def release(self, fh):
684 if fh in self._filehandles:
685 _logger.debug("arv-mount release fh %i", fh)
687 self._filehandles[fh].flush()
691 self._filehandles[fh].release()
692 del self._filehandles[fh]
693 self.inodes.inode_cache.cap_cache()
695 def releasedir(self, fh):
700 def opendir(self, inode, ctx=None):
701 _logger.debug("arv-mount opendir: inode %i", inode)
703 if inode in self.inodes:
704 p = self.inodes[inode]
706 raise llfuse.FUSEError(errno.ENOENT)
708 if not isinstance(p, Directory):
709 raise llfuse.FUSEError(errno.ENOTDIR)
711 fh = next(self._filehandles_counter)
712 if p.parent_inode in self.inodes:
713 parent = self.inodes[p.parent_inode]
715 raise llfuse.FUSEError(errno.EIO)
719 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p))
724 def readdir(self, fh, off):
725 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
727 if fh in self._filehandles:
728 handle = self._filehandles[fh]
730 raise llfuse.FUSEError(errno.EBADF)
733 while e < len(handle.entries):
734 if handle.entries[e][1].inode in self.inodes:
735 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
740 def statfs(self, ctx=None):
741 st = llfuse.StatvfsData()
742 st.f_bsize = 128 * 1024
755 def _check_writable(self, inode_parent):
756 if not self.enable_write:
757 raise llfuse.FUSEError(errno.EROFS)
759 if inode_parent in self.inodes:
760 p = self.inodes[inode_parent]
762 raise llfuse.FUSEError(errno.ENOENT)
764 if not isinstance(p, Directory):
765 raise llfuse.FUSEError(errno.ENOTDIR)
768 raise llfuse.FUSEError(errno.EPERM)
774 def create(self, inode_parent, name, mode, flags, ctx=None):
775 name = name.decode(encoding=self.inodes.encoding)
776 _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
778 p = self._check_writable(inode_parent)
781 # The file entry should have been implicitly created by callback.
783 fh = next(self._filehandles_counter)
784 self._filehandles[fh] = FileHandle(fh, f)
788 return (fh, self.getattr(f.inode))
792 def mkdir(self, inode_parent, name, mode, ctx=None):
793 name = name.decode(encoding=self.inodes.encoding)
794 _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
796 p = self._check_writable(inode_parent)
799 # The dir entry should have been implicitly created by callback.
803 return self.getattr(d.inode)
807 def unlink(self, inode_parent, name, ctx=None):
808 name = name.decode(encoding=self.inodes.encoding)
809 _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
810 p = self._check_writable(inode_parent)
815 def rmdir(self, inode_parent, name, ctx=None):
816 name = name.decode(encoding=self.inodes.encoding)
817 _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
818 p = self._check_writable(inode_parent)
823 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
824 name_old = name_old.decode(encoding=self.inodes.encoding)
825 name_new = name_new.decode(encoding=self.inodes.encoding)
826 _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
827 src = self._check_writable(inode_parent_old)
828 dest = self._check_writable(inode_parent_new)
829 dest.rename(name_old, name_new, src)
834 if fh in self._filehandles:
835 self._filehandles[fh].flush()
837 def fsync(self, fh, datasync):
840 def fsyncdir(self, fh, datasync):