1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 """FUSE driver for Arvados Keep
9 There is one `Operations` object per mount point. It is the entry point for all
10 read and write requests from the llfuse module.
12 The operations object owns an `Inodes` object. The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
16 The `Inodes` object owns an `InodeCache` object. The inode cache records the
17 memory footprint of file system objects and when they are last used. When the
18 cache limit is exceeded, the least recently used objects are cleared.
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
22 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
25 Directory objects inherit from `fusedir.Directory`. The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`. These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
32 The general FUSE operation flow is as follows:
34 - The request handler is called with either an inode or file handle that is the
35 subject of the operation.
37 - Look up the inode using the Inodes table or the file handle in the
38 filehandles table to get the file system object.
40 - For methods that alter files or directories, check that the operation is
41 valid and permitted using _check_writable().
43 - Call the relevant method on the file system object.
47 The FUSE driver supports the Arvados event bus. When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
52 from __future__ import absolute_import
53 from __future__ import division
54 from future.utils import viewitems
55 from future.utils import native
56 from future.utils import listvalues
57 from future import standard_library
58 standard_library.install_aliases()
59 from builtins import next
60 from builtins import str
61 from builtins import object
84 from prometheus_client import Summary
87 # Default _notify_queue has a limit of 1000 items, but it really needs to be
88 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
91 if hasattr(llfuse, 'capi'):
93 llfuse.capi._notify_queue = queue.Queue()
96 llfuse._notify_queue = queue.Queue()
98 LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
100 from .fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
101 from .fusefile import StringFile, FuseArvadosFile
103 _logger = logging.getLogger('arvados.arvados_fuse')
105 # Uncomment this to enable llfuse debug logging.
106 # log_handler = logging.StreamHandler()
107 # llogger = logging.getLogger('llfuse')
108 # llogger.addHandler(log_handler)
109 # llogger.setLevel(logging.DEBUG)
111 class Handle(object):
112 """Connects a numeric file handle to a File or Directory object that has
113 been opened by the client."""
115 def __init__(self, fh, obj):
127 class FileHandle(Handle):
128 """Connects a numeric file handle to a File object that has
129 been opened by the client."""
132 if self.obj.writable():
133 return self.obj.flush()
136 class DirectoryHandle(Handle):
137 """Connects a numeric file handle to a Directory object that has
138 been opened by the client."""
140 def __init__(self, fh, dirobj, entries):
141 super(DirectoryHandle, self).__init__(fh, dirobj)
142 self.entries = entries
145 class InodeCache(object):
146 """Records the memory footprint of objects and when they are last used.
148 When the cache limit is exceeded, the least recently used objects are
149 cleared. Clearing the object means discarding its contents to release
150 memory. The next time the object is accessed, it must be re-fetched from
151 the server. Note that the inode cache limit is a soft limit; the cache
152 limit may be exceeded if necessary to load very large objects, it may also
153 be exceeded if open file handles prevent objects from being cleared.
157 def __init__(self, cap, min_entries=4):
158 self._entries = collections.OrderedDict()
162 self.min_entries = min_entries
167 def _remove(self, obj, clear):
169 # Kernel behavior seems to be that if a file is
170 # referenced, its parents remain referenced too. This
171 # means has_ref() exits early when a collection is not
172 # candidate for eviction.
174 # By contrast, in_use() doesn't increment references on
175 # parents, so it requires a full tree walk to determine if
176 # a collection is a candidate for eviction. This takes
177 # .07s for 240000 files, which becomes a major drag when
178 # cap_cache is being called several times a second and
179 # there are multiple non-evictable collections in the
182 # So it is important for performance that we do the
183 # has_ref() check first.
185 if obj.has_ref(True):
186 _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
190 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
193 obj.kernel_invalidate()
194 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
197 # The llfuse lock is released in del_entry(), which is called by
198 # Directory.clear(). While the llfuse lock is released, it can happen
199 # that a reentrant call removes this entry before this call gets to it.
200 # Ensure that the entry is still valid before trying to remove it.
201 if obj.inode not in self._entries:
204 self._total -= obj.cache_size
205 del self._entries[obj.inode]
207 self._by_uuid[obj.cache_uuid].remove(obj)
208 if not self._by_uuid[obj.cache_uuid]:
209 del self._by_uuid[obj.cache_uuid]
210 obj.cache_uuid = None
212 _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
215 if self._total > self.cap:
216 for ent in listvalues(self._entries):
217 if self._total < self.cap or len(self._entries) < self.min_entries:
219 self._remove(ent, True)
221 def manage(self, obj):
223 obj.cache_size = obj.objsize()
224 self._entries[obj.inode] = obj
225 obj.cache_uuid = obj.uuid()
227 if obj.cache_uuid not in self._by_uuid:
228 self._by_uuid[obj.cache_uuid] = [obj]
230 if obj not in self._by_uuid[obj.cache_uuid]:
231 self._by_uuid[obj.cache_uuid].append(obj)
232 self._total += obj.objsize()
233 _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
234 obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
237 def touch(self, obj):
239 if obj.inode in self._entries:
240 self._remove(obj, False)
243 def unmanage(self, obj):
244 if obj.persisted() and obj.inode in self._entries:
245 self._remove(obj, True)
247 def find_by_uuid(self, uuid):
248 return self._by_uuid.get(uuid, [])
251 self._entries.clear()
252 self._by_uuid.clear()
255 class Inodes(object):
256 """Manage the set of inodes. This is the mapping from a numeric id
257 to a concrete File or Directory object"""
259 def __init__(self, inode_cache, encoding="utf-8"):
261 self._counter = itertools.count(llfuse.ROOT_INODE)
262 self.inode_cache = inode_cache
263 self.encoding = encoding
264 self.deferred_invalidations = []
266 def __getitem__(self, item):
267 return self._entries[item]
269 def __setitem__(self, key, item):
270 self._entries[key] = item
273 return iter(self._entries.keys())
276 return self._entries.items()
278 def __contains__(self, k):
279 return k in self._entries
281 def touch(self, entry):
282 entry._atime = time.time()
283 self.inode_cache.touch(entry)
285 def add_entry(self, entry):
286 entry.inode = next(self._counter)
287 if entry.inode == llfuse.ROOT_INODE:
289 self._entries[entry.inode] = entry
290 self.inode_cache.manage(entry)
293 def del_entry(self, entry):
294 if entry.ref_count == 0:
295 self.inode_cache.unmanage(entry)
296 del self._entries[entry.inode]
297 with llfuse.lock_released:
302 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
304 def invalidate_inode(self, entry):
305 if entry.has_ref(False):
306 # Only necessary if the kernel has previously done a lookup on this
307 # inode and hasn't yet forgotten about it.
308 llfuse.invalidate_inode(entry.inode)
310 def invalidate_entry(self, entry, name):
311 if entry.has_ref(False):
312 # Only necessary if the kernel has previously done a lookup on this
313 # inode and hasn't yet forgotten about it.
314 llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
317 self.inode_cache.clear()
319 for k,v in viewitems(self._entries):
322 except Exception as e:
323 _logger.exception("Error during finalize of inode %i", k)
325 self._entries.clear()
328 def catch_exceptions(orig_func):
329 """Catch uncaught exceptions and log them consistently."""
331 @functools.wraps(orig_func)
332 def catch_exceptions_wrapper(self, *args, **kwargs):
334 return orig_func(self, *args, **kwargs)
335 except llfuse.FUSEError:
337 except EnvironmentError as e:
338 raise llfuse.FUSEError(e.errno)
339 except arvados.errors.KeepWriteError as e:
340 _logger.error("Keep write error: " + str(e))
341 raise llfuse.FUSEError(errno.EIO)
342 except arvados.errors.NotFoundError as e:
343 _logger.error("Block not found error: " + str(e))
344 raise llfuse.FUSEError(errno.EIO)
346 _logger.exception("Unhandled exception during FUSE operation")
347 raise llfuse.FUSEError(errno.EIO)
349 return catch_exceptions_wrapper
352 class Operations(llfuse.Operations):
353 """This is the main interface with llfuse.
355 The methods on this object are called by llfuse threads to service FUSE
356 events to query and read from the file system.
358 llfuse has its own global lock which is acquired before calling a request handler,
359 so request handlers do not run concurrently unless the lock is explicitly released
360 using 'with llfuse.lock_released:'
364 fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
365 read_time = fuse_time.labels(op='read')
366 write_time = fuse_time.labels(op='write')
367 destroy_time = fuse_time.labels(op='destroy')
368 on_event_time = fuse_time.labels(op='on_event')
369 getattr_time = fuse_time.labels(op='getattr')
370 setattr_time = fuse_time.labels(op='setattr')
371 lookup_time = fuse_time.labels(op='lookup')
372 forget_time = fuse_time.labels(op='forget')
373 open_time = fuse_time.labels(op='open')
374 release_time = fuse_time.labels(op='release')
375 opendir_time = fuse_time.labels(op='opendir')
376 readdir_time = fuse_time.labels(op='readdir')
377 statfs_time = fuse_time.labels(op='statfs')
378 create_time = fuse_time.labels(op='create')
379 mkdir_time = fuse_time.labels(op='mkdir')
380 unlink_time = fuse_time.labels(op='unlink')
381 rmdir_time = fuse_time.labels(op='rmdir')
382 rename_time = fuse_time.labels(op='rename')
383 flush_time = fuse_time.labels(op='flush')
385 def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
386 super(Operations, self).__init__()
388 self._api_client = api_client
391 inode_cache = InodeCache(cap=256*1024*1024)
392 self.inodes = Inodes(inode_cache, encoding=encoding)
395 self.enable_write = enable_write
397 # dict of inode to filehandle
398 self._filehandles = {}
399 self._filehandles_counter = itertools.count(0)
401 # Other threads that need to wait until the fuse driver
402 # is fully initialized should wait() on this event object.
403 self.initlock = threading.Event()
405 # If we get overlapping shutdown events (e.g., fusermount -u
406 # -z and operations.destroy()) llfuse calls forget() on inodes
407 # that have already been deleted. To avoid this, we make
408 # forget() a no-op if called after destroy().
409 self._shutdown_started = threading.Event()
411 self.num_retries = num_retries
413 self.read_counter = arvados.keep.Counter()
414 self.write_counter = arvados.keep.Counter()
415 self.read_ops_counter = arvados.keep.Counter()
416 self.write_ops_counter = arvados.keep.Counter()
421 # Allow threads that are waiting for the driver to be finished
422 # initializing to continue
425 def metric_samples(self):
426 return self.fuse_time.collect()[0].samples
428 def metric_op_names(self):
430 for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
431 if cur_op not in ops:
435 def metric_value(self, opname, metric):
436 op_value = [sample.value for sample in self.metric_samples()
437 if sample.name == metric and sample.labels['op'] == opname]
438 return op_value[0] if len(op_value) == 1 else None
440 def metric_sum_func(self, opname):
441 return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
443 def metric_count_func(self, opname):
444 return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
449 self._shutdown_started.set()
454 # Different versions of llfuse require and forbid us to
455 # acquire the lock here. See #8345#note-37, #10805#note-9.
456 if LLFUSE_VERSION_0 and llfuse.lock.acquire():
459 llfuse.lock.release()
464 def access(self, inode, mode, ctx):
467 def listen_for_events(self):
468 self.events = arvados.events.subscribe(
470 [["event_type", "in", ["create", "update", "delete"]]],
473 @on_event_time.time()
475 def on_event(self, ev):
476 if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
479 properties = ev.get("properties") or {}
480 old_attrs = properties.get("old_attributes") or {}
481 new_attrs = properties.get("new_attributes") or {}
483 for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
485 if ev.get("object_kind") == "arvados#collection":
486 pdh = new_attrs.get("portable_data_hash")
487 # new_attributes.modified_at currently lacks
488 # subsecond precision (see #6347) so use event_at
489 # which should always be the same.
490 stamp = ev.get("event_at")
491 if (stamp and pdh and item.writable() and
492 item.collection is not None and
493 item.collection.modified() and
494 new_attrs.get("is_trashed") is not True):
495 item.update(to_record_version=(stamp, pdh))
497 oldowner = old_attrs.get("owner_uuid")
498 newowner = ev.get("object_owner_uuid")
500 self.inodes.inode_cache.find_by_uuid(oldowner) +
501 self.inodes.inode_cache.find_by_uuid(newowner)):
502 parent.child_event(ev)
506 def getattr(self, inode, ctx=None):
507 if inode not in self.inodes:
508 raise llfuse.FUSEError(errno.ENOENT)
510 e = self.inodes[inode]
512 entry = llfuse.EntryAttributes()
515 entry.entry_timeout = 0
516 entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
518 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
519 if isinstance(e, Directory):
520 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
522 entry.st_mode |= stat.S_IFREG
523 if isinstance(e, FuseArvadosFile):
524 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
526 if self.enable_write and e.writable():
527 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
530 entry.st_uid = self.uid
531 entry.st_gid = self.gid
534 entry.st_size = e.size()
536 entry.st_blksize = 512
537 entry.st_blocks = (entry.st_size // 512) + 1
538 if hasattr(entry, 'st_atime_ns'):
540 entry.st_atime_ns = int(e.atime() * 1000000000)
541 entry.st_mtime_ns = int(e.mtime() * 1000000000)
542 entry.st_ctime_ns = int(e.mtime() * 1000000000)
545 entry.st_atime = int(e.atime)
546 entry.st_mtime = int(e.mtime)
547 entry.st_ctime = int(e.mtime)
553 def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
554 entry = self.getattr(inode)
556 if fh is not None and fh in self._filehandles:
557 handle = self._filehandles[fh]
560 e = self.inodes[inode]
564 update_size = attr.st_size is not None
567 update_size = fields.update_size
568 if update_size and isinstance(e, FuseArvadosFile):
569 with llfuse.lock_released:
570 e.arvfile.truncate(attr.st_size)
571 entry.st_size = e.arvfile.size()
577 def lookup(self, parent_inode, name, ctx=None):
578 name = str(name, self.inodes.encoding)
584 if parent_inode in self.inodes:
585 p = self.inodes[parent_inode]
588 inode = p.parent_inode
589 elif isinstance(p, Directory) and name in p:
590 inode = p[name].inode
593 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
594 parent_inode, name, inode)
595 self.inodes[inode].inc_ref()
596 return self.getattr(inode)
598 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
600 raise llfuse.FUSEError(errno.ENOENT)
604 def forget(self, inodes):
605 if self._shutdown_started.is_set():
607 for inode, nlookup in inodes:
608 ent = self.inodes[inode]
609 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
610 if ent.dec_ref(nlookup) == 0 and ent.dead:
611 self.inodes.del_entry(ent)
615 def open(self, inode, flags, ctx=None):
616 if inode in self.inodes:
617 p = self.inodes[inode]
619 raise llfuse.FUSEError(errno.ENOENT)
621 if isinstance(p, Directory):
622 raise llfuse.FUSEError(errno.EISDIR)
624 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
625 raise llfuse.FUSEError(errno.EPERM)
627 fh = next(self._filehandles_counter)
628 self._filehandles[fh] = FileHandle(fh, p)
631 # Normally, we will have received an "update" event if the
632 # parent collection is stale here. However, even if the parent
633 # collection hasn't changed, the manifest might have been
634 # fetched so long ago that the signatures on the data block
635 # locators have expired. Calling checkupdate() on all
636 # ancestors ensures the signatures will be refreshed if
638 while p.parent_inode in self.inodes:
639 if p == self.inodes[p.parent_inode]:
641 p = self.inodes[p.parent_inode]
645 _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
651 def read(self, fh, off, size):
652 _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
653 self.read_ops_counter.add(1)
655 if fh in self._filehandles:
656 handle = self._filehandles[fh]
658 raise llfuse.FUSEError(errno.EBADF)
660 self.inodes.touch(handle.obj)
662 r = handle.obj.readfrom(off, size, self.num_retries)
664 self.read_counter.add(len(r))
669 def write(self, fh, off, buf):
670 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
671 self.write_ops_counter.add(1)
673 if fh in self._filehandles:
674 handle = self._filehandles[fh]
676 raise llfuse.FUSEError(errno.EBADF)
678 if not handle.obj.writable():
679 raise llfuse.FUSEError(errno.EPERM)
681 self.inodes.touch(handle.obj)
683 w = handle.obj.writeto(off, buf, self.num_retries)
685 self.write_counter.add(w)
690 def release(self, fh):
691 if fh in self._filehandles:
692 _logger.debug("arv-mount release fh %i", fh)
694 self._filehandles[fh].flush()
698 self._filehandles[fh].release()
699 del self._filehandles[fh]
700 self.inodes.inode_cache.cap_cache()
702 def releasedir(self, fh):
707 def opendir(self, inode, ctx=None):
708 _logger.debug("arv-mount opendir: inode %i", inode)
710 if inode in self.inodes:
711 p = self.inodes[inode]
713 raise llfuse.FUSEError(errno.ENOENT)
715 if not isinstance(p, Directory):
716 raise llfuse.FUSEError(errno.ENOTDIR)
718 fh = next(self._filehandles_counter)
719 if p.parent_inode in self.inodes:
720 parent = self.inodes[p.parent_inode]
722 raise llfuse.FUSEError(errno.EIO)
727 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
732 def readdir(self, fh, off):
733 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
735 if fh in self._filehandles:
736 handle = self._filehandles[fh]
738 raise llfuse.FUSEError(errno.EBADF)
741 while e < len(handle.entries):
742 if handle.entries[e][1].inode in self.inodes:
743 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
748 def statfs(self, ctx=None):
749 st = llfuse.StatvfsData()
750 st.f_bsize = 128 * 1024
763 def _check_writable(self, inode_parent):
764 if not self.enable_write:
765 raise llfuse.FUSEError(errno.EROFS)
767 if inode_parent in self.inodes:
768 p = self.inodes[inode_parent]
770 raise llfuse.FUSEError(errno.ENOENT)
772 if not isinstance(p, Directory):
773 raise llfuse.FUSEError(errno.ENOTDIR)
776 raise llfuse.FUSEError(errno.EPERM)
782 def create(self, inode_parent, name, mode, flags, ctx=None):
784 _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
786 p = self._check_writable(inode_parent)
789 # The file entry should have been implicitly created by callback.
791 fh = next(self._filehandles_counter)
792 self._filehandles[fh] = FileHandle(fh, f)
796 return (fh, self.getattr(f.inode))
800 def mkdir(self, inode_parent, name, mode, ctx=None):
802 _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
804 p = self._check_writable(inode_parent)
807 # The dir entry should have been implicitly created by callback.
811 return self.getattr(d.inode)
815 def unlink(self, inode_parent, name, ctx=None):
816 _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
817 p = self._check_writable(inode_parent)
818 p.unlink(name.decode())
822 def rmdir(self, inode_parent, name, ctx=None):
823 _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
824 p = self._check_writable(inode_parent)
825 p.rmdir(name.decode())
829 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
830 _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
831 src = self._check_writable(inode_parent_old)
832 dest = self._check_writable(inode_parent_new)
833 dest.rename(name_old.decode(), name_new.decode(), src)
838 if fh in self._filehandles:
839 self._filehandles[fh].flush()
841 def fsync(self, fh, datasync):
844 def fsyncdir(self, fh, datasync):