1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 """FUSE driver for Arvados Keep
9 There is one `Operations` object per mount point. It is the entry point for all
10 read and write requests from the llfuse module.
12 The operations object owns an `Inodes` object. The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
16 The `Inodes` object owns an `InodeCache` object. The inode cache records the
17 memory footprint of file system objects and when they are last used. When the
18 cache limit is exceeded, the least recently used objects are cleared.
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
22 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
25 Directory objects inherit from `fusedir.Directory`. The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`. These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
32 The general FUSE operation flow is as follows:
34 - The request handler is called with either an inode or file handle that is the
35 subject of the operation.
37 - Look up the inode using the Inodes table or the file handle in the
38 filehandles table to get the file system object.
40 - For methods that alter files or directories, check that the operation is
41 valid and permitted using _check_writable().
43 - Call the relevant method on the file system object.
47 The FUSE driver supports the Arvados event bus. When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
52 from __future__ import absolute_import
53 from __future__ import division
54 from future.utils import viewitems
55 from future.utils import native
56 from future.utils import listvalues
57 from future.utils import listitems
58 from future import standard_library
59 standard_library.install_aliases()
60 from builtins import next
61 from builtins import str
62 from builtins import object
77 from prometheus_client import Summary
80 # Default _notify_queue has a limit of 1000 items, but it really needs to be
81 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
84 if hasattr(llfuse, 'capi'):
86 llfuse.capi._notify_queue = queue.Queue()
89 llfuse._notify_queue = queue.Queue()
91 LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
93 from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
94 from .fusefile import StringFile, FuseArvadosFile
96 _logger = logging.getLogger('arvados.arvados_fuse')
98 # Uncomment this to enable llfuse debug logging.
99 # log_handler = logging.StreamHandler()
100 # llogger = logging.getLogger('llfuse')
101 # llogger.addHandler(log_handler)
102 # llogger.setLevel(logging.DEBUG)
104 class Handle(object):
105 """Connects a numeric file handle to a File or Directory object that has
106 been opened by the client."""
108 def __init__(self, fh, obj):
120 class FileHandle(Handle):
121 """Connects a numeric file handle to a File object that has
122 been opened by the client."""
125 if self.obj.writable():
126 return self.obj.flush()
129 class DirectoryHandle(Handle):
130 """Connects a numeric file handle to a Directory object that has
131 been opened by the client."""
133 def __init__(self, fh, dirobj, entries):
134 super(DirectoryHandle, self).__init__(fh, dirobj)
135 self.entries = entries
136 for ent in self.entries:
140 for ent in self.entries:
142 super(DirectoryHandle, self).release()
145 class InodeCache(object):
146 """Records the memory footprint of objects and when they are last used.
148 When the cache limit is exceeded, the least recently used objects are
149 cleared. Clearing the object means discarding its contents to release
150 memory. The next time the object is accessed, it must be re-fetched from
151 the server. Note that the inode cache limit is a soft limit; the cache
152 limit may be exceeded if necessary to load very large objects, it may also
153 be exceeded if open file handles prevent objects from being cleared.
157 def __init__(self, cap, min_entries=4):
158 self._entries = collections.OrderedDict()
162 self.min_entries = min_entries
167 def _remove(self, obj, clear):
168 if obj.inode is None:
171 # Kernel behavior seems to be that if a file is
172 # referenced, its parents remain referenced too. This
173 # means has_ref() exits early when a collection is not
174 # candidate for eviction.
176 # By contrast, in_use() doesn't increment references on
177 # parents, so it requires a full tree walk to determine if
178 # a collection is a candidate for eviction. This takes
179 # .07s for 240000 files, which becomes a major drag when
180 # cap_cache is being called several times a second and
181 # there are multiple non-evictable collections in the
184 # So it is important for performance that we do the
185 # has_ref() check first.
187 if obj.has_ref(True):
188 #_logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
192 #_logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
195 obj.kernel_invalidate()
196 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
199 # The llfuse lock is released in del_entry(), which is called by
200 # Directory.clear(). While the llfuse lock is released, it can happen
201 # that a reentrant call removes this entry before this call gets to it.
202 # Ensure that the entry is still valid before trying to remove it.
203 if obj.inode not in self._entries:
206 self._total -= obj.cache_size
207 del self._entries[obj.inode]
209 self._by_uuid[obj.cache_uuid].remove(obj)
210 if not self._by_uuid[obj.cache_uuid]:
211 del self._by_uuid[obj.cache_uuid]
212 obj.cache_uuid = None
214 _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
217 _logger.debug("in cap_cache %i, %i", self._total, self.cap)
218 if self._total > self.cap:
219 for ent in listvalues(self._entries):
220 if self._total < self.cap or len(self._entries) < self.min_entries:
222 self._remove(ent, True)
223 _logger.debug("end cap_cache %i, %i", self._total, self.cap)
225 def manage(self, obj):
227 obj.cache_size = obj.objsize()
228 self._entries[obj.inode] = obj
229 obj.cache_uuid = obj.uuid()
231 if obj.cache_uuid not in self._by_uuid:
232 self._by_uuid[obj.cache_uuid] = [obj]
234 if obj not in self._by_uuid[obj.cache_uuid]:
235 self._by_uuid[obj.cache_uuid].append(obj)
236 self._total += obj.cache_size
237 _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
238 obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
240 def update_cache_size(self, obj):
241 if obj.inode in self._entries:
242 self._total -= obj.cache_size
243 obj.cache_size = obj.objsize()
244 self._total += obj.cache_size
246 def touch(self, obj):
248 if obj.inode in self._entries:
249 self._entries.move_to_end(obj.inode)
253 def unmanage(self, obj):
254 if obj.persisted() and obj.inode in self._entries:
255 self._remove(obj, True)
257 def find_by_uuid(self, uuid):
258 return self._by_uuid.get(uuid, [])
261 self._entries.clear()
262 self._by_uuid.clear()
265 class Inodes(object):
266 """Manage the set of inodes. This is the mapping from a numeric id
267 to a concrete File or Directory object"""
269 def __init__(self, inode_cache, encoding="utf-8"):
271 self._counter = itertools.count(llfuse.ROOT_INODE)
272 self.inode_cache = inode_cache
273 self.encoding = encoding
274 self.deferred_invalidations = []
276 def __getitem__(self, item):
277 return self._entries[item]
279 def __setitem__(self, key, item):
280 self._entries[key] = item
283 return iter(self._entries.keys())
286 return viewitems(self._entries.items())
288 def __contains__(self, k):
289 return k in self._entries
291 def touch(self, entry):
292 entry._atime = time.time()
293 self.inode_cache.touch(entry)
295 def add_entry(self, entry):
296 entry.inode = next(self._counter)
297 if entry.inode == llfuse.ROOT_INODE:
299 self._entries[entry.inode] = entry
300 self.inode_cache.manage(entry)
303 def del_entry(self, entry):
304 if entry.ref_count == 0:
305 self.inode_cache.unmanage(entry)
306 del self._entries[entry.inode]
307 with llfuse.lock_released:
312 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
314 def invalidate_inode(self, entry):
315 if entry.has_ref(False):
316 # Only necessary if the kernel has previously done a lookup on this
317 # inode and hasn't yet forgotten about it.
318 llfuse.invalidate_inode(entry.inode)
320 def invalidate_entry(self, entry, name):
321 if entry.has_ref(False):
322 # Only necessary if the kernel has previously done a lookup on this
323 # inode and hasn't yet forgotten about it.
324 llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
327 self.inode_cache.clear()
329 for k,v in viewitems(self._entries):
332 except Exception as e:
333 _logger.exception("Error during finalize of inode %i", k)
335 self._entries.clear()
338 def catch_exceptions(orig_func):
339 """Catch uncaught exceptions and log them consistently."""
341 @functools.wraps(orig_func)
342 def catch_exceptions_wrapper(self, *args, **kwargs):
344 return orig_func(self, *args, **kwargs)
345 except llfuse.FUSEError:
347 except EnvironmentError as e:
348 raise llfuse.FUSEError(e.errno)
349 except NotImplementedError:
350 raise llfuse.FUSEError(errno.ENOTSUP)
351 except arvados.errors.KeepWriteError as e:
352 _logger.error("Keep write error: " + str(e))
353 raise llfuse.FUSEError(errno.EIO)
354 except arvados.errors.NotFoundError as e:
355 _logger.error("Block not found error: " + str(e))
356 raise llfuse.FUSEError(errno.EIO)
358 _logger.exception("Unhandled exception during FUSE operation")
359 raise llfuse.FUSEError(errno.EIO)
361 return catch_exceptions_wrapper
364 class Operations(llfuse.Operations):
365 """This is the main interface with llfuse.
367 The methods on this object are called by llfuse threads to service FUSE
368 events to query and read from the file system.
370 llfuse has its own global lock which is acquired before calling a request handler,
371 so request handlers do not run concurrently unless the lock is explicitly released
372 using 'with llfuse.lock_released:'
376 fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
377 read_time = fuse_time.labels(op='read')
378 write_time = fuse_time.labels(op='write')
379 destroy_time = fuse_time.labels(op='destroy')
380 on_event_time = fuse_time.labels(op='on_event')
381 getattr_time = fuse_time.labels(op='getattr')
382 setattr_time = fuse_time.labels(op='setattr')
383 lookup_time = fuse_time.labels(op='lookup')
384 forget_time = fuse_time.labels(op='forget')
385 open_time = fuse_time.labels(op='open')
386 release_time = fuse_time.labels(op='release')
387 opendir_time = fuse_time.labels(op='opendir')
388 readdir_time = fuse_time.labels(op='readdir')
389 statfs_time = fuse_time.labels(op='statfs')
390 create_time = fuse_time.labels(op='create')
391 mkdir_time = fuse_time.labels(op='mkdir')
392 unlink_time = fuse_time.labels(op='unlink')
393 rmdir_time = fuse_time.labels(op='rmdir')
394 rename_time = fuse_time.labels(op='rename')
395 flush_time = fuse_time.labels(op='flush')
397 def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
398 super(Operations, self).__init__()
400 self._api_client = api_client
403 inode_cache = InodeCache(cap=256*1024*1024)
404 self.inodes = Inodes(inode_cache, encoding=encoding)
407 self.enable_write = enable_write
409 # dict of inode to filehandle
410 self._filehandles = {}
411 self._filehandles_counter = itertools.count(0)
413 # Other threads that need to wait until the fuse driver
414 # is fully initialized should wait() on this event object.
415 self.initlock = threading.Event()
417 # If we get overlapping shutdown events (e.g., fusermount -u
418 # -z and operations.destroy()) llfuse calls forget() on inodes
419 # that have already been deleted. To avoid this, we make
420 # forget() a no-op if called after destroy().
421 self._shutdown_started = threading.Event()
423 self.num_retries = num_retries
425 self.read_counter = arvados.keep.Counter()
426 self.write_counter = arvados.keep.Counter()
427 self.read_ops_counter = arvados.keep.Counter()
428 self.write_ops_counter = arvados.keep.Counter()
433 # Allow threads that are waiting for the driver to be finished
434 # initializing to continue
437 def metric_samples(self):
438 return self.fuse_time.collect()[0].samples
440 def metric_op_names(self):
442 for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
443 if cur_op not in ops:
447 def metric_value(self, opname, metric):
448 op_value = [sample.value for sample in self.metric_samples()
449 if sample.name == metric and sample.labels['op'] == opname]
450 return op_value[0] if len(op_value) == 1 else None
452 def metric_sum_func(self, opname):
453 return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
455 def metric_count_func(self, opname):
456 return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
461 self._shutdown_started.set()
466 # Different versions of llfuse require and forbid us to
467 # acquire the lock here. See #8345#note-37, #10805#note-9.
468 if LLFUSE_VERSION_0 and llfuse.lock.acquire():
471 llfuse.lock.release()
476 def access(self, inode, mode, ctx):
479 def listen_for_events(self):
480 self.events = arvados.events.subscribe(
482 [["event_type", "in", ["create", "update", "delete"]]],
485 @on_event_time.time()
487 def on_event(self, ev):
488 if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
491 properties = ev.get("properties") or {}
492 old_attrs = properties.get("old_attributes") or {}
493 new_attrs = properties.get("new_attributes") or {}
495 for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
498 oldowner = old_attrs.get("owner_uuid")
499 newowner = ev.get("object_owner_uuid")
501 self.inodes.inode_cache.find_by_uuid(oldowner) +
502 self.inodes.inode_cache.find_by_uuid(newowner)):
507 def getattr(self, inode, ctx=None):
508 if inode not in self.inodes:
509 raise llfuse.FUSEError(errno.ENOENT)
511 e = self.inodes[inode]
513 entry = llfuse.EntryAttributes()
516 entry.entry_timeout = 0
517 entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
519 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
520 if isinstance(e, Directory):
521 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
523 entry.st_mode |= stat.S_IFREG
524 if isinstance(e, FuseArvadosFile):
525 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
527 if self.enable_write and e.writable():
528 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
531 entry.st_uid = self.uid
532 entry.st_gid = self.gid
535 entry.st_size = e.size()
537 entry.st_blksize = 512
538 entry.st_blocks = (entry.st_size // 512) + 1
539 if hasattr(entry, 'st_atime_ns'):
541 entry.st_atime_ns = int(e.atime() * 1000000000)
542 entry.st_mtime_ns = int(e.mtime() * 1000000000)
543 entry.st_ctime_ns = int(e.mtime() * 1000000000)
546 entry.st_atime = int(e.atime)
547 entry.st_mtime = int(e.mtime)
548 entry.st_ctime = int(e.mtime)
554 def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
555 entry = self.getattr(inode)
557 if fh is not None and fh in self._filehandles:
558 handle = self._filehandles[fh]
561 e = self.inodes[inode]
565 update_size = attr.st_size is not None
568 update_size = fields.update_size
569 if update_size and isinstance(e, FuseArvadosFile):
570 with llfuse.lock_released:
571 e.arvfile.truncate(attr.st_size)
572 entry.st_size = e.arvfile.size()
578 def lookup(self, parent_inode, name, ctx=None):
579 name = str(name, self.inodes.encoding)
584 elif parent_inode in self.inodes:
585 p = self.inodes[parent_inode]
588 inode = p.parent_inode
589 elif isinstance(p, Directory) and name in p:
590 inode = p[name].inode
593 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
594 parent_inode, name, inode)
595 self.inodes[inode].inc_ref()
596 return self.getattr(inode)
598 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
600 raise llfuse.FUSEError(errno.ENOENT)
604 def forget(self, inodes):
605 if self._shutdown_started.is_set():
607 for inode, nlookup in inodes:
608 ent = self.inodes[inode]
609 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
610 if ent.dec_ref(nlookup) == 0 and ent.dead:
611 self.inodes.del_entry(ent)
615 def open(self, inode, flags, ctx=None):
616 if inode in self.inodes:
617 p = self.inodes[inode]
619 raise llfuse.FUSEError(errno.ENOENT)
621 if isinstance(p, Directory):
622 raise llfuse.FUSEError(errno.EISDIR)
624 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
625 raise llfuse.FUSEError(errno.EPERM)
627 fh = next(self._filehandles_counter)
628 self._filehandles[fh] = FileHandle(fh, p)
631 # Normally, we will have received an "update" event if the
632 # parent collection is stale here. However, even if the parent
633 # collection hasn't changed, the manifest might have been
634 # fetched so long ago that the signatures on the data block
635 # locators have expired. Calling checkupdate() on all
636 # ancestors ensures the signatures will be refreshed if
638 while p.parent_inode in self.inodes:
639 if p == self.inodes[p.parent_inode]:
641 p = self.inodes[p.parent_inode]
645 _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
651 def read(self, fh, off, size):
652 _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
653 self.read_ops_counter.add(1)
655 if fh in self._filehandles:
656 handle = self._filehandles[fh]
658 raise llfuse.FUSEError(errno.EBADF)
660 self.inodes.touch(handle.obj)
662 r = handle.obj.readfrom(off, size, self.num_retries)
664 self.read_counter.add(len(r))
669 def write(self, fh, off, buf):
670 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
671 self.write_ops_counter.add(1)
673 if fh in self._filehandles:
674 handle = self._filehandles[fh]
676 raise llfuse.FUSEError(errno.EBADF)
678 if not handle.obj.writable():
679 raise llfuse.FUSEError(errno.EPERM)
681 self.inodes.touch(handle.obj)
683 w = handle.obj.writeto(off, buf, self.num_retries)
685 self.write_counter.add(w)
690 def release(self, fh):
691 if fh in self._filehandles:
692 _logger.debug("arv-mount release fh %i", fh)
694 self._filehandles[fh].flush()
698 self._filehandles[fh].release()
699 del self._filehandles[fh]
700 self.inodes.inode_cache.cap_cache()
702 def releasedir(self, fh):
707 def opendir(self, inode, ctx=None):
708 _logger.debug("arv-mount opendir: inode %i", inode)
710 if inode in self.inodes:
711 p = self.inodes[inode]
713 raise llfuse.FUSEError(errno.ENOENT)
715 if not isinstance(p, Directory):
716 raise llfuse.FUSEError(errno.ENOTDIR)
718 fh = next(self._filehandles_counter)
719 if p.parent_inode in self.inodes:
720 parent = self.inodes[p.parent_inode]
722 _logger.warning("arv-mount opendir: parent inode %i of %i is missing", p.parent_inode, inode)
723 raise llfuse.FUSEError(errno.EIO)
725 _logger.debug("arv-mount opendir: inode %i fh %i ", inode, fh)
729 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p))
734 def readdir(self, fh, off):
735 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
737 if fh in self._filehandles:
738 handle = self._filehandles[fh]
740 raise llfuse.FUSEError(errno.EBADF)
743 while e < len(handle.entries):
744 ent = handle.entries[e]
745 if ent[1].inode in self.inodes:
746 yield (ent[0].encode(self.inodes.encoding), self.getattr(ent[1].inode), e+1)
751 def statfs(self, ctx=None):
752 st = llfuse.StatvfsData()
753 st.f_bsize = 128 * 1024
766 def _check_writable(self, inode_parent):
767 if not self.enable_write:
768 raise llfuse.FUSEError(errno.EROFS)
770 if inode_parent in self.inodes:
771 p = self.inodes[inode_parent]
773 raise llfuse.FUSEError(errno.ENOENT)
775 if not isinstance(p, Directory):
776 raise llfuse.FUSEError(errno.ENOTDIR)
779 raise llfuse.FUSEError(errno.EPERM)
785 def create(self, inode_parent, name, mode, flags, ctx=None):
786 name = name.decode(encoding=self.inodes.encoding)
787 _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
789 p = self._check_writable(inode_parent)
792 # The file entry should have been implicitly created by callback.
794 fh = next(self._filehandles_counter)
795 self._filehandles[fh] = FileHandle(fh, f)
799 return (fh, self.getattr(f.inode))
803 def mkdir(self, inode_parent, name, mode, ctx=None):
804 name = name.decode(encoding=self.inodes.encoding)
805 _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
807 p = self._check_writable(inode_parent)
810 # The dir entry should have been implicitly created by callback.
814 return self.getattr(d.inode)
818 def unlink(self, inode_parent, name, ctx=None):
819 name = name.decode(encoding=self.inodes.encoding)
820 _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
821 p = self._check_writable(inode_parent)
826 def rmdir(self, inode_parent, name, ctx=None):
827 name = name.decode(encoding=self.inodes.encoding)
828 _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
829 p = self._check_writable(inode_parent)
834 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
835 name_old = name_old.decode(encoding=self.inodes.encoding)
836 name_new = name_new.decode(encoding=self.inodes.encoding)
837 _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
838 src = self._check_writable(inode_parent_old)
839 dest = self._check_writable(inode_parent_new)
840 dest.rename(name_old, name_new, src)
845 if fh in self._filehandles:
846 self._filehandles[fh].flush()
848 def fsync(self, fh, datasync):
851 def fsyncdir(self, fh, datasync):