1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 """FUSE driver for Arvados Keep
9 There is one `Operations` object per mount point. It is the entry point for all
10 read and write requests from the llfuse module.
12 The operations object owns an `Inodes` object. The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
16 The `Inodes` object owns an `InodeCache` object. The inode cache records the
17 memory footprint of file system objects and when they are last used. When the
18 cache limit is exceeded, the least recently used objects are cleared.
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
22 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
25 Directory objects inherit from `fusedir.Directory`. The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`. These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
32 The general FUSE operation flow is as follows:
34 - The request handler is called with either an inode or file handle that is the
35 subject of the operation.
37 - Look up the inode using the Inodes table or the file handle in the
38 filehandles table to get the file system object.
40 - For methods that alter files or directories, check that the operation is
41 valid and permitted using _check_writable().
43 - Call the relevant method on the file system object.
47 The FUSE driver supports the Arvados event bus. When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
52 from __future__ import absolute_import
53 from __future__ import division
54 from future.utils import viewitems
55 from future.utils import native
56 from future.utils import listvalues
57 from future.utils import listitems
58 from future import standard_library
59 standard_library.install_aliases()
60 from builtins import next
61 from builtins import str
62 from builtins import object
77 from prometheus_client import Summary
80 # Default _notify_queue has a limit of 1000 items, but it really needs to be
81 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
84 if hasattr(llfuse, 'capi'):
86 llfuse.capi._notify_queue = queue.Queue()
89 llfuse._notify_queue = queue.Queue()
91 LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
93 from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
94 from .fusefile import StringFile, FuseArvadosFile
96 _logger = logging.getLogger('arvados.arvados_fuse')
98 # Uncomment this to enable llfuse debug logging.
99 # log_handler = logging.StreamHandler()
100 # llogger = logging.getLogger('llfuse')
101 # llogger.addHandler(log_handler)
102 # llogger.setLevel(logging.DEBUG)
104 class Handle(object):
105 """Connects a numeric file handle to a File or Directory object that has
106 been opened by the client."""
108 def __init__(self, fh, obj):
120 class FileHandle(Handle):
121 """Connects a numeric file handle to a File object that has
122 been opened by the client."""
125 if self.obj.writable():
126 return self.obj.flush()
129 class DirectoryHandle(Handle):
130 """Connects a numeric file handle to a Directory object that has
131 been opened by the client."""
133 def __init__(self, fh, dirobj, entries):
134 super(DirectoryHandle, self).__init__(fh, dirobj)
135 self.entries = entries
138 class InodeCache(object):
139 """Records the memory footprint of objects and when they are last used.
141 When the cache limit is exceeded, the least recently used objects are
142 cleared. Clearing the object means discarding its contents to release
143 memory. The next time the object is accessed, it must be re-fetched from
144 the server. Note that the inode cache limit is a soft limit; the cache
145 limit may be exceeded if necessary to load very large objects, it may also
146 be exceeded if open file handles prevent objects from being cleared.
150 def __init__(self, cap, min_entries=4):
151 self._entries = collections.OrderedDict()
155 self.min_entries = min_entries
160 def _remove(self, obj, clear):
162 # Kernel behavior seems to be that if a file is
163 # referenced, its parents remain referenced too. This
164 # means has_ref() exits early when a collection is not
165 # candidate for eviction.
167 # By contrast, in_use() doesn't increment references on
168 # parents, so it requires a full tree walk to determine if
169 # a collection is a candidate for eviction. This takes
170 # .07s for 240000 files, which becomes a major drag when
171 # cap_cache is being called several times a second and
172 # there are multiple non-evictable collections in the
175 # So it is important for performance that we do the
176 # has_ref() check first.
178 if obj.has_ref(True):
179 _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
183 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
186 obj.kernel_invalidate()
187 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
190 # The llfuse lock is released in del_entry(), which is called by
191 # Directory.clear(). While the llfuse lock is released, it can happen
192 # that a reentrant call removes this entry before this call gets to it.
193 # Ensure that the entry is still valid before trying to remove it.
194 if obj.inode not in self._entries:
197 self._total -= obj.cache_size
198 del self._entries[obj.inode]
200 self._by_uuid[obj.cache_uuid].remove(obj)
201 if not self._by_uuid[obj.cache_uuid]:
202 del self._by_uuid[obj.cache_uuid]
203 obj.cache_uuid = None
205 _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
208 if self._total > self.cap:
209 for ent in listvalues(self._entries):
210 if self._total < self.cap or len(self._entries) < self.min_entries:
212 self._remove(ent, True)
214 def manage(self, obj):
216 obj.cache_size = obj.objsize()
217 self._entries[obj.inode] = obj
218 obj.cache_uuid = obj.uuid()
220 if obj.cache_uuid not in self._by_uuid:
221 self._by_uuid[obj.cache_uuid] = [obj]
223 if obj not in self._by_uuid[obj.cache_uuid]:
224 self._by_uuid[obj.cache_uuid].append(obj)
225 self._total += obj.objsize()
226 _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
227 obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
230 def touch(self, obj):
232 if obj.inode in self._entries:
233 self._remove(obj, False)
236 def unmanage(self, obj):
237 if obj.persisted() and obj.inode in self._entries:
238 self._remove(obj, True)
240 def find_by_uuid(self, uuid):
241 return self._by_uuid.get(uuid, [])
244 self._entries.clear()
245 self._by_uuid.clear()
248 class Inodes(object):
249 """Manage the set of inodes. This is the mapping from a numeric id
250 to a concrete File or Directory object"""
252 def __init__(self, inode_cache, encoding="utf-8"):
254 self._counter = itertools.count(llfuse.ROOT_INODE)
255 self.inode_cache = inode_cache
256 self.encoding = encoding
257 self.deferred_invalidations = []
259 def __getitem__(self, item):
260 return self._entries[item]
262 def __setitem__(self, key, item):
263 self._entries[key] = item
266 return iter(self._entries.keys())
269 return viewitems(self._entries.items())
271 def __contains__(self, k):
272 return k in self._entries
274 def touch(self, entry):
275 entry._atime = time.time()
276 self.inode_cache.touch(entry)
278 def add_entry(self, entry):
279 entry.inode = next(self._counter)
280 if entry.inode == llfuse.ROOT_INODE:
282 self._entries[entry.inode] = entry
283 self.inode_cache.manage(entry)
286 def del_entry(self, entry):
287 if entry.ref_count == 0:
288 self.inode_cache.unmanage(entry)
289 del self._entries[entry.inode]
290 with llfuse.lock_released:
295 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
297 def invalidate_inode(self, entry):
298 if entry.has_ref(False):
299 # Only necessary if the kernel has previously done a lookup on this
300 # inode and hasn't yet forgotten about it.
301 llfuse.invalidate_inode(entry.inode)
303 def invalidate_entry(self, entry, name):
304 if entry.has_ref(False):
305 # Only necessary if the kernel has previously done a lookup on this
306 # inode and hasn't yet forgotten about it.
307 llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
310 self.inode_cache.clear()
312 for k,v in viewitems(self._entries):
315 except Exception as e:
316 _logger.exception("Error during finalize of inode %i", k)
318 self._entries.clear()
321 def catch_exceptions(orig_func):
322 """Catch uncaught exceptions and log them consistently."""
324 @functools.wraps(orig_func)
325 def catch_exceptions_wrapper(self, *args, **kwargs):
327 return orig_func(self, *args, **kwargs)
328 except llfuse.FUSEError:
330 except EnvironmentError as e:
331 raise llfuse.FUSEError(e.errno)
332 except arvados.errors.KeepWriteError as e:
333 _logger.error("Keep write error: " + str(e))
334 raise llfuse.FUSEError(errno.EIO)
335 except arvados.errors.NotFoundError as e:
336 _logger.error("Block not found error: " + str(e))
337 raise llfuse.FUSEError(errno.EIO)
339 _logger.exception("Unhandled exception during FUSE operation")
340 raise llfuse.FUSEError(errno.EIO)
342 return catch_exceptions_wrapper
345 class Operations(llfuse.Operations):
346 """This is the main interface with llfuse.
348 The methods on this object are called by llfuse threads to service FUSE
349 events to query and read from the file system.
351 llfuse has its own global lock which is acquired before calling a request handler,
352 so request handlers do not run concurrently unless the lock is explicitly released
353 using 'with llfuse.lock_released:'
357 fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
358 read_time = fuse_time.labels(op='read')
359 write_time = fuse_time.labels(op='write')
360 destroy_time = fuse_time.labels(op='destroy')
361 on_event_time = fuse_time.labels(op='on_event')
362 getattr_time = fuse_time.labels(op='getattr')
363 setattr_time = fuse_time.labels(op='setattr')
364 lookup_time = fuse_time.labels(op='lookup')
365 forget_time = fuse_time.labels(op='forget')
366 open_time = fuse_time.labels(op='open')
367 release_time = fuse_time.labels(op='release')
368 opendir_time = fuse_time.labels(op='opendir')
369 readdir_time = fuse_time.labels(op='readdir')
370 statfs_time = fuse_time.labels(op='statfs')
371 create_time = fuse_time.labels(op='create')
372 mkdir_time = fuse_time.labels(op='mkdir')
373 unlink_time = fuse_time.labels(op='unlink')
374 rmdir_time = fuse_time.labels(op='rmdir')
375 rename_time = fuse_time.labels(op='rename')
376 flush_time = fuse_time.labels(op='flush')
378 def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
379 super(Operations, self).__init__()
381 self._api_client = api_client
384 inode_cache = InodeCache(cap=256*1024*1024)
385 self.inodes = Inodes(inode_cache, encoding=encoding)
388 self.enable_write = enable_write
390 # dict of inode to filehandle
391 self._filehandles = {}
392 self._filehandles_counter = itertools.count(0)
394 # Other threads that need to wait until the fuse driver
395 # is fully initialized should wait() on this event object.
396 self.initlock = threading.Event()
398 # If we get overlapping shutdown events (e.g., fusermount -u
399 # -z and operations.destroy()) llfuse calls forget() on inodes
400 # that have already been deleted. To avoid this, we make
401 # forget() a no-op if called after destroy().
402 self._shutdown_started = threading.Event()
404 self.num_retries = num_retries
406 self.read_counter = arvados.keep.Counter()
407 self.write_counter = arvados.keep.Counter()
408 self.read_ops_counter = arvados.keep.Counter()
409 self.write_ops_counter = arvados.keep.Counter()
414 # Allow threads that are waiting for the driver to be finished
415 # initializing to continue
418 def metric_samples(self):
419 return self.fuse_time.collect()[0].samples
421 def metric_op_names(self):
423 for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
424 if cur_op not in ops:
428 def metric_value(self, opname, metric):
429 op_value = [sample.value for sample in self.metric_samples()
430 if sample.name == metric and sample.labels['op'] == opname]
431 return op_value[0] if len(op_value) == 1 else None
433 def metric_sum_func(self, opname):
434 return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
436 def metric_count_func(self, opname):
437 return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
442 self._shutdown_started.set()
447 # Different versions of llfuse require and forbid us to
448 # acquire the lock here. See #8345#note-37, #10805#note-9.
449 if LLFUSE_VERSION_0 and llfuse.lock.acquire():
452 llfuse.lock.release()
457 def access(self, inode, mode, ctx):
460 def listen_for_events(self):
461 self.events = arvados.events.subscribe(
463 [["event_type", "in", ["create", "update", "delete"]]],
466 @on_event_time.time()
468 def on_event(self, ev):
469 if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
472 properties = ev.get("properties") or {}
473 old_attrs = properties.get("old_attributes") or {}
474 new_attrs = properties.get("new_attributes") or {}
476 for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
479 oldowner = old_attrs.get("owner_uuid")
480 newowner = ev.get("object_owner_uuid")
482 self.inodes.inode_cache.find_by_uuid(oldowner) +
483 self.inodes.inode_cache.find_by_uuid(newowner)):
488 def getattr(self, inode, ctx=None):
489 if inode not in self.inodes:
490 raise llfuse.FUSEError(errno.ENOENT)
492 e = self.inodes[inode]
494 entry = llfuse.EntryAttributes()
497 entry.entry_timeout = 0
498 entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
500 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
501 if isinstance(e, Directory):
502 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
504 entry.st_mode |= stat.S_IFREG
505 if isinstance(e, FuseArvadosFile):
506 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
508 if self.enable_write and e.writable():
509 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
512 entry.st_uid = self.uid
513 entry.st_gid = self.gid
516 entry.st_size = e.size()
518 entry.st_blksize = 512
519 entry.st_blocks = (entry.st_size // 512) + 1
520 if hasattr(entry, 'st_atime_ns'):
522 entry.st_atime_ns = int(e.atime() * 1000000000)
523 entry.st_mtime_ns = int(e.mtime() * 1000000000)
524 entry.st_ctime_ns = int(e.mtime() * 1000000000)
527 entry.st_atime = int(e.atime)
528 entry.st_mtime = int(e.mtime)
529 entry.st_ctime = int(e.mtime)
535 def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
536 entry = self.getattr(inode)
538 if fh is not None and fh in self._filehandles:
539 handle = self._filehandles[fh]
542 e = self.inodes[inode]
546 update_size = attr.st_size is not None
549 update_size = fields.update_size
550 if update_size and isinstance(e, FuseArvadosFile):
551 with llfuse.lock_released:
552 e.arvfile.truncate(attr.st_size)
553 entry.st_size = e.arvfile.size()
559 def lookup(self, parent_inode, name, ctx=None):
560 name = str(name, self.inodes.encoding)
566 if parent_inode in self.inodes:
567 p = self.inodes[parent_inode]
570 inode = p.parent_inode
571 elif isinstance(p, Directory) and name in p:
572 inode = p[name].inode
575 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
576 parent_inode, name, inode)
577 self.inodes[inode].inc_ref()
578 return self.getattr(inode)
580 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
582 raise llfuse.FUSEError(errno.ENOENT)
586 def forget(self, inodes):
587 if self._shutdown_started.is_set():
589 for inode, nlookup in inodes:
590 ent = self.inodes[inode]
591 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
592 if ent.dec_ref(nlookup) == 0 and ent.dead:
593 self.inodes.del_entry(ent)
597 def open(self, inode, flags, ctx=None):
598 if inode in self.inodes:
599 p = self.inodes[inode]
601 raise llfuse.FUSEError(errno.ENOENT)
603 if isinstance(p, Directory):
604 raise llfuse.FUSEError(errno.EISDIR)
606 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
607 raise llfuse.FUSEError(errno.EPERM)
609 fh = next(self._filehandles_counter)
610 self._filehandles[fh] = FileHandle(fh, p)
613 # Normally, we will have received an "update" event if the
614 # parent collection is stale here. However, even if the parent
615 # collection hasn't changed, the manifest might have been
616 # fetched so long ago that the signatures on the data block
617 # locators have expired. Calling checkupdate() on all
618 # ancestors ensures the signatures will be refreshed if
620 while p.parent_inode in self.inodes:
621 if p == self.inodes[p.parent_inode]:
623 p = self.inodes[p.parent_inode]
627 _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
633 def read(self, fh, off, size):
634 _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
635 self.read_ops_counter.add(1)
637 if fh in self._filehandles:
638 handle = self._filehandles[fh]
640 raise llfuse.FUSEError(errno.EBADF)
642 self.inodes.touch(handle.obj)
644 r = handle.obj.readfrom(off, size, self.num_retries)
646 self.read_counter.add(len(r))
651 def write(self, fh, off, buf):
652 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
653 self.write_ops_counter.add(1)
655 if fh in self._filehandles:
656 handle = self._filehandles[fh]
658 raise llfuse.FUSEError(errno.EBADF)
660 if not handle.obj.writable():
661 raise llfuse.FUSEError(errno.EPERM)
663 self.inodes.touch(handle.obj)
665 w = handle.obj.writeto(off, buf, self.num_retries)
667 self.write_counter.add(w)
672 def release(self, fh):
673 if fh in self._filehandles:
674 _logger.debug("arv-mount release fh %i", fh)
676 self._filehandles[fh].flush()
680 self._filehandles[fh].release()
681 del self._filehandles[fh]
682 self.inodes.inode_cache.cap_cache()
684 def releasedir(self, fh):
689 def opendir(self, inode, ctx=None):
690 _logger.debug("arv-mount opendir: inode %i", inode)
692 if inode in self.inodes:
693 p = self.inodes[inode]
695 raise llfuse.FUSEError(errno.ENOENT)
697 if not isinstance(p, Directory):
698 raise llfuse.FUSEError(errno.ENOTDIR)
700 fh = next(self._filehandles_counter)
701 if p.parent_inode in self.inodes:
702 parent = self.inodes[p.parent_inode]
704 raise llfuse.FUSEError(errno.EIO)
708 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p))
713 def readdir(self, fh, off):
714 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
716 if fh in self._filehandles:
717 handle = self._filehandles[fh]
719 raise llfuse.FUSEError(errno.EBADF)
722 while e < len(handle.entries):
723 if handle.entries[e][1].inode in self.inodes:
724 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
729 def statfs(self, ctx=None):
730 st = llfuse.StatvfsData()
731 st.f_bsize = 128 * 1024
744 def _check_writable(self, inode_parent):
745 if not self.enable_write:
746 raise llfuse.FUSEError(errno.EROFS)
748 if inode_parent in self.inodes:
749 p = self.inodes[inode_parent]
751 raise llfuse.FUSEError(errno.ENOENT)
753 if not isinstance(p, Directory):
754 raise llfuse.FUSEError(errno.ENOTDIR)
757 raise llfuse.FUSEError(errno.EPERM)
763 def create(self, inode_parent, name, mode, flags, ctx=None):
764 name = name.decode(encoding=self.inodes.encoding)
765 _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
767 p = self._check_writable(inode_parent)
770 # The file entry should have been implicitly created by callback.
772 fh = next(self._filehandles_counter)
773 self._filehandles[fh] = FileHandle(fh, f)
777 return (fh, self.getattr(f.inode))
781 def mkdir(self, inode_parent, name, mode, ctx=None):
782 name = name.decode(encoding=self.inodes.encoding)
783 _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
785 p = self._check_writable(inode_parent)
788 # The dir entry should have been implicitly created by callback.
792 return self.getattr(d.inode)
796 def unlink(self, inode_parent, name, ctx=None):
797 name = name.decode(encoding=self.inodes.encoding)
798 _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
799 p = self._check_writable(inode_parent)
804 def rmdir(self, inode_parent, name, ctx=None):
805 name = name.decode(encoding=self.inodes.encoding)
806 _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
807 p = self._check_writable(inode_parent)
812 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
813 name_old = name_old.decode(encoding=self.inodes.encoding)
814 name_new = name_new.decode(encoding=self.inodes.encoding)
815 _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
816 src = self._check_writable(inode_parent_old)
817 dest = self._check_writable(inode_parent_new)
818 dest.rename(name_old, name_new, src)
823 if fh in self._filehandles:
824 self._filehandles[fh].flush()
826 def fsync(self, fh, datasync):
829 def fsyncdir(self, fh, datasync):