1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 """FUSE driver for Arvados Keep
9 There is one `Operations` object per mount point. It is the entry point for all
10 read and write requests from the llfuse module.
12 The operations object owns an `Inodes` object. The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
16 The `Inodes` object owns an `InodeCache` object. The inode cache records the
17 memory footprint of file system objects and when they are last used. When the
18 cache limit is exceeded, the least recently used objects are cleared.
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
22 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
25 Directory objects inherit from `fusedir.Directory`. The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`. These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
32 The general FUSE operation flow is as follows:
34 - The request handler is called with either an inode or file handle that is the
35 subject of the operation.
37 - Look up the inode using the Inodes table or the file handle in the
38 filehandles table to get the file system object.
40 - For methods that alter files or directories, check that the operation is
41 valid and permitted using _check_writable().
43 - Call the relevant method on the file system object.
47 The FUSE driver supports the Arvados event bus. When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
52 from __future__ import absolute_import
53 from __future__ import division
54 from future.utils import viewitems
55 from future.utils import native
56 from future.utils import listvalues
57 from future.utils import listitems
58 from future import standard_library
59 standard_library.install_aliases()
60 from builtins import next
61 from builtins import str
62 from builtins import object
85 from prometheus_client import Summary
88 # Default _notify_queue has a limit of 1000 items, but it really needs to be
89 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
92 if hasattr(llfuse, 'capi'):
94 llfuse.capi._notify_queue = queue.Queue()
97 llfuse._notify_queue = queue.Queue()
99 LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
101 from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
102 from .fusefile import StringFile, FuseArvadosFile
104 _logger = logging.getLogger('arvados.arvados_fuse')
106 # Uncomment this to enable llfuse debug logging.
107 # log_handler = logging.StreamHandler()
108 # llogger = logging.getLogger('llfuse')
109 # llogger.addHandler(log_handler)
110 # llogger.setLevel(logging.DEBUG)
112 class Handle(object):
113 """Connects a numeric file handle to a File or Directory object that has
114 been opened by the client."""
116 def __init__(self, fh, obj):
128 class FileHandle(Handle):
129 """Connects a numeric file handle to a File object that has
130 been opened by the client."""
133 if self.obj.writable():
134 return self.obj.flush()
137 class DirectoryHandle(Handle):
138 """Connects a numeric file handle to a Directory object that has
139 been opened by the client."""
141 def __init__(self, fh, dirobj, entries):
142 super(DirectoryHandle, self).__init__(fh, dirobj)
143 self.entries = entries
146 class InodeCache(object):
147 """Records the memory footprint of objects and when they are last used.
149 When the cache limit is exceeded, the least recently used objects are
150 cleared. Clearing the object means discarding its contents to release
151 memory. The next time the object is accessed, it must be re-fetched from
152 the server. Note that the inode cache limit is a soft limit; the cache
153 limit may be exceeded if necessary to load very large objects, it may also
154 be exceeded if open file handles prevent objects from being cleared.
158 def __init__(self, cap, min_entries=4):
159 self._entries = collections.OrderedDict()
163 self.min_entries = min_entries
168 def _remove(self, obj, clear):
170 # Kernel behavior seems to be that if a file is
171 # referenced, its parents remain referenced too. This
172 # means has_ref() exits early when a collection is not
173 # candidate for eviction.
175 # By contrast, in_use() doesn't increment references on
176 # parents, so it requires a full tree walk to determine if
177 # a collection is a candidate for eviction. This takes
178 # .07s for 240000 files, which becomes a major drag when
179 # cap_cache is being called several times a second and
180 # there are multiple non-evictable collections in the
183 # So it is important for performance that we do the
184 # has_ref() check first.
186 if obj.has_ref(True):
187 _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
191 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
194 obj.kernel_invalidate()
195 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
198 # The llfuse lock is released in del_entry(), which is called by
199 # Directory.clear(). While the llfuse lock is released, it can happen
200 # that a reentrant call removes this entry before this call gets to it.
201 # Ensure that the entry is still valid before trying to remove it.
202 if obj.inode not in self._entries:
205 self._total -= obj.cache_size
206 del self._entries[obj.inode]
208 self._by_uuid[obj.cache_uuid].remove(obj)
209 if not self._by_uuid[obj.cache_uuid]:
210 del self._by_uuid[obj.cache_uuid]
211 obj.cache_uuid = None
213 _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
216 if self._total > self.cap:
217 for ent in listvalues(self._entries):
218 if self._total < self.cap or len(self._entries) < self.min_entries:
220 self._remove(ent, True)
222 def manage(self, obj):
224 obj.cache_size = obj.objsize()
225 self._entries[obj.inode] = obj
226 obj.cache_uuid = obj.uuid()
228 if obj.cache_uuid not in self._by_uuid:
229 self._by_uuid[obj.cache_uuid] = [obj]
231 if obj not in self._by_uuid[obj.cache_uuid]:
232 self._by_uuid[obj.cache_uuid].append(obj)
233 self._total += obj.objsize()
234 _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
235 obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
238 def touch(self, obj):
240 if obj.inode in self._entries:
241 self._remove(obj, False)
244 def unmanage(self, obj):
245 if obj.persisted() and obj.inode in self._entries:
246 self._remove(obj, True)
248 def find_by_uuid(self, uuid):
249 return self._by_uuid.get(uuid, [])
252 self._entries.clear()
253 self._by_uuid.clear()
256 class Inodes(object):
257 """Manage the set of inodes. This is the mapping from a numeric id
258 to a concrete File or Directory object"""
260 def __init__(self, inode_cache, encoding="utf-8"):
262 self._counter = itertools.count(llfuse.ROOT_INODE)
263 self.inode_cache = inode_cache
264 self.encoding = encoding
265 self.deferred_invalidations = []
267 def __getitem__(self, item):
268 return self._entries[item]
270 def __setitem__(self, key, item):
271 self._entries[key] = item
274 return iter(self._entries.keys())
277 return viewitems(self._entries.items())
279 def __contains__(self, k):
280 return k in self._entries
282 def touch(self, entry):
283 entry._atime = time.time()
284 self.inode_cache.touch(entry)
286 def add_entry(self, entry):
287 entry.inode = next(self._counter)
288 if entry.inode == llfuse.ROOT_INODE:
290 self._entries[entry.inode] = entry
291 self.inode_cache.manage(entry)
294 def del_entry(self, entry):
295 if entry.ref_count == 0:
296 self.inode_cache.unmanage(entry)
297 del self._entries[entry.inode]
298 with llfuse.lock_released:
303 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
305 def invalidate_inode(self, entry):
306 if entry.has_ref(False):
307 # Only necessary if the kernel has previously done a lookup on this
308 # inode and hasn't yet forgotten about it.
309 llfuse.invalidate_inode(entry.inode)
311 def invalidate_entry(self, entry, name):
312 if entry.has_ref(False):
313 # Only necessary if the kernel has previously done a lookup on this
314 # inode and hasn't yet forgotten about it.
315 llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
318 self.inode_cache.clear()
320 for k,v in viewitems(self._entries):
323 except Exception as e:
324 _logger.exception("Error during finalize of inode %i", k)
326 self._entries.clear()
329 def catch_exceptions(orig_func):
330 """Catch uncaught exceptions and log them consistently."""
332 @functools.wraps(orig_func)
333 def catch_exceptions_wrapper(self, *args, **kwargs):
335 return orig_func(self, *args, **kwargs)
336 except llfuse.FUSEError:
338 except EnvironmentError as e:
339 raise llfuse.FUSEError(e.errno)
340 except arvados.errors.KeepWriteError as e:
341 _logger.error("Keep write error: " + str(e))
342 raise llfuse.FUSEError(errno.EIO)
343 except arvados.errors.NotFoundError as e:
344 _logger.error("Block not found error: " + str(e))
345 raise llfuse.FUSEError(errno.EIO)
347 _logger.exception("Unhandled exception during FUSE operation")
348 raise llfuse.FUSEError(errno.EIO)
350 return catch_exceptions_wrapper
353 class Operations(llfuse.Operations):
354 """This is the main interface with llfuse.
356 The methods on this object are called by llfuse threads to service FUSE
357 events to query and read from the file system.
359 llfuse has its own global lock which is acquired before calling a request handler,
360 so request handlers do not run concurrently unless the lock is explicitly released
361 using 'with llfuse.lock_released:'
365 fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
366 read_time = fuse_time.labels(op='read')
367 write_time = fuse_time.labels(op='write')
368 destroy_time = fuse_time.labels(op='destroy')
369 on_event_time = fuse_time.labels(op='on_event')
370 getattr_time = fuse_time.labels(op='getattr')
371 setattr_time = fuse_time.labels(op='setattr')
372 lookup_time = fuse_time.labels(op='lookup')
373 forget_time = fuse_time.labels(op='forget')
374 open_time = fuse_time.labels(op='open')
375 release_time = fuse_time.labels(op='release')
376 opendir_time = fuse_time.labels(op='opendir')
377 readdir_time = fuse_time.labels(op='readdir')
378 statfs_time = fuse_time.labels(op='statfs')
379 create_time = fuse_time.labels(op='create')
380 mkdir_time = fuse_time.labels(op='mkdir')
381 unlink_time = fuse_time.labels(op='unlink')
382 rmdir_time = fuse_time.labels(op='rmdir')
383 rename_time = fuse_time.labels(op='rename')
384 flush_time = fuse_time.labels(op='flush')
386 def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
387 super(Operations, self).__init__()
389 self._api_client = api_client
392 inode_cache = InodeCache(cap=256*1024*1024)
393 self.inodes = Inodes(inode_cache, encoding=encoding)
396 self.enable_write = enable_write
398 # dict of inode to filehandle
399 self._filehandles = {}
400 self._filehandles_counter = itertools.count(0)
402 # Other threads that need to wait until the fuse driver
403 # is fully initialized should wait() on this event object.
404 self.initlock = threading.Event()
406 # If we get overlapping shutdown events (e.g., fusermount -u
407 # -z and operations.destroy()) llfuse calls forget() on inodes
408 # that have already been deleted. To avoid this, we make
409 # forget() a no-op if called after destroy().
410 self._shutdown_started = threading.Event()
412 self.num_retries = num_retries
414 self.read_counter = arvados.keep.Counter()
415 self.write_counter = arvados.keep.Counter()
416 self.read_ops_counter = arvados.keep.Counter()
417 self.write_ops_counter = arvados.keep.Counter()
422 # Allow threads that are waiting for the driver to be finished
423 # initializing to continue
426 def metric_samples(self):
427 return self.fuse_time.collect()[0].samples
429 def metric_op_names(self):
431 for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
432 if cur_op not in ops:
436 def metric_value(self, opname, metric):
437 op_value = [sample.value for sample in self.metric_samples()
438 if sample.name == metric and sample.labels['op'] == opname]
439 return op_value[0] if len(op_value) == 1 else None
441 def metric_sum_func(self, opname):
442 return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
444 def metric_count_func(self, opname):
445 return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
450 self._shutdown_started.set()
455 # Different versions of llfuse require and forbid us to
456 # acquire the lock here. See #8345#note-37, #10805#note-9.
457 if LLFUSE_VERSION_0 and llfuse.lock.acquire():
460 llfuse.lock.release()
465 def access(self, inode, mode, ctx):
468 def listen_for_events(self):
469 self.events = arvados.events.subscribe(
471 [["event_type", "in", ["create", "update", "delete"]]],
474 @on_event_time.time()
476 def on_event(self, ev):
477 if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
480 properties = ev.get("properties") or {}
481 old_attrs = properties.get("old_attributes") or {}
482 new_attrs = properties.get("new_attributes") or {}
484 for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
486 if ev.get("object_kind") == "arvados#collection":
487 pdh = new_attrs.get("portable_data_hash")
488 # new_attributes.modified_at currently lacks
489 # subsecond precision (see #6347) so use event_at
490 # which should always be the same.
491 stamp = ev.get("event_at")
492 if (stamp and pdh and item.writable() and
493 item.collection is not None and
494 item.collection.modified() and
495 new_attrs.get("is_trashed") is not True):
496 item.update(to_record_version=(stamp, pdh))
498 oldowner = old_attrs.get("owner_uuid")
499 newowner = ev.get("object_owner_uuid")
501 self.inodes.inode_cache.find_by_uuid(oldowner) +
502 self.inodes.inode_cache.find_by_uuid(newowner)):
503 parent.child_event(ev)
507 def getattr(self, inode, ctx=None):
508 if inode not in self.inodes:
509 raise llfuse.FUSEError(errno.ENOENT)
511 e = self.inodes[inode]
513 entry = llfuse.EntryAttributes()
516 entry.entry_timeout = 0
517 entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
519 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
520 if isinstance(e, Directory):
521 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
523 entry.st_mode |= stat.S_IFREG
524 if isinstance(e, FuseArvadosFile):
525 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
527 if self.enable_write and e.writable():
528 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
531 entry.st_uid = self.uid
532 entry.st_gid = self.gid
535 entry.st_size = e.size()
537 entry.st_blksize = 512
538 entry.st_blocks = (entry.st_size // 512) + 1
539 if hasattr(entry, 'st_atime_ns'):
541 entry.st_atime_ns = int(e.atime() * 1000000000)
542 entry.st_mtime_ns = int(e.mtime() * 1000000000)
543 entry.st_ctime_ns = int(e.mtime() * 1000000000)
546 entry.st_atime = int(e.atime)
547 entry.st_mtime = int(e.mtime)
548 entry.st_ctime = int(e.mtime)
554 def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
555 entry = self.getattr(inode)
557 if fh is not None and fh in self._filehandles:
558 handle = self._filehandles[fh]
561 e = self.inodes[inode]
565 update_size = attr.st_size is not None
568 update_size = fields.update_size
569 if update_size and isinstance(e, FuseArvadosFile):
570 with llfuse.lock_released:
571 e.arvfile.truncate(attr.st_size)
572 entry.st_size = e.arvfile.size()
578 def lookup(self, parent_inode, name, ctx=None):
579 name = str(name, self.inodes.encoding)
585 if parent_inode in self.inodes:
586 p = self.inodes[parent_inode]
589 inode = p.parent_inode
590 elif isinstance(p, Directory) and name in p:
591 inode = p[name].inode
594 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
595 parent_inode, name, inode)
596 self.inodes[inode].inc_ref()
597 return self.getattr(inode)
599 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
601 raise llfuse.FUSEError(errno.ENOENT)
605 def forget(self, inodes):
606 if self._shutdown_started.is_set():
608 for inode, nlookup in inodes:
609 ent = self.inodes[inode]
610 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
611 if ent.dec_ref(nlookup) == 0 and ent.dead:
612 self.inodes.del_entry(ent)
616 def open(self, inode, flags, ctx=None):
617 if inode in self.inodes:
618 p = self.inodes[inode]
620 raise llfuse.FUSEError(errno.ENOENT)
622 if isinstance(p, Directory):
623 raise llfuse.FUSEError(errno.EISDIR)
625 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
626 raise llfuse.FUSEError(errno.EPERM)
628 fh = next(self._filehandles_counter)
629 self._filehandles[fh] = FileHandle(fh, p)
632 # Normally, we will have received an "update" event if the
633 # parent collection is stale here. However, even if the parent
634 # collection hasn't changed, the manifest might have been
635 # fetched so long ago that the signatures on the data block
636 # locators have expired. Calling checkupdate() on all
637 # ancestors ensures the signatures will be refreshed if
639 while p.parent_inode in self.inodes:
640 if p == self.inodes[p.parent_inode]:
642 p = self.inodes[p.parent_inode]
646 _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
652 def read(self, fh, off, size):
653 _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
654 self.read_ops_counter.add(1)
656 if fh in self._filehandles:
657 handle = self._filehandles[fh]
659 raise llfuse.FUSEError(errno.EBADF)
661 self.inodes.touch(handle.obj)
663 r = handle.obj.readfrom(off, size, self.num_retries)
665 self.read_counter.add(len(r))
670 def write(self, fh, off, buf):
671 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
672 self.write_ops_counter.add(1)
674 if fh in self._filehandles:
675 handle = self._filehandles[fh]
677 raise llfuse.FUSEError(errno.EBADF)
679 if not handle.obj.writable():
680 raise llfuse.FUSEError(errno.EPERM)
682 self.inodes.touch(handle.obj)
684 w = handle.obj.writeto(off, buf, self.num_retries)
686 self.write_counter.add(w)
691 def release(self, fh):
692 if fh in self._filehandles:
693 _logger.debug("arv-mount release fh %i", fh)
695 self._filehandles[fh].flush()
699 self._filehandles[fh].release()
700 del self._filehandles[fh]
701 self.inodes.inode_cache.cap_cache()
703 def releasedir(self, fh):
708 def opendir(self, inode, ctx=None):
709 _logger.debug("arv-mount opendir: inode %i", inode)
711 if inode in self.inodes:
712 p = self.inodes[inode]
714 raise llfuse.FUSEError(errno.ENOENT)
716 if not isinstance(p, Directory):
717 raise llfuse.FUSEError(errno.ENOTDIR)
719 fh = next(self._filehandles_counter)
720 if p.parent_inode in self.inodes:
721 parent = self.inodes[p.parent_inode]
723 raise llfuse.FUSEError(errno.EIO)
727 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p))
732 def readdir(self, fh, off):
733 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
735 if fh in self._filehandles:
736 handle = self._filehandles[fh]
738 raise llfuse.FUSEError(errno.EBADF)
741 while e < len(handle.entries):
742 if handle.entries[e][1].inode in self.inodes:
743 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
748 def statfs(self, ctx=None):
749 st = llfuse.StatvfsData()
750 st.f_bsize = 128 * 1024
763 def _check_writable(self, inode_parent):
764 if not self.enable_write:
765 raise llfuse.FUSEError(errno.EROFS)
767 if inode_parent in self.inodes:
768 p = self.inodes[inode_parent]
770 raise llfuse.FUSEError(errno.ENOENT)
772 if not isinstance(p, Directory):
773 raise llfuse.FUSEError(errno.ENOTDIR)
776 raise llfuse.FUSEError(errno.EPERM)
782 def create(self, inode_parent, name, mode, flags, ctx=None):
783 name = name.decode(encoding=self.inodes.encoding)
784 _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
786 p = self._check_writable(inode_parent)
789 # The file entry should have been implicitly created by callback.
791 fh = next(self._filehandles_counter)
792 self._filehandles[fh] = FileHandle(fh, f)
796 return (fh, self.getattr(f.inode))
800 def mkdir(self, inode_parent, name, mode, ctx=None):
801 name = name.decode(encoding=self.inodes.encoding)
802 _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
804 p = self._check_writable(inode_parent)
807 # The dir entry should have been implicitly created by callback.
811 return self.getattr(d.inode)
815 def unlink(self, inode_parent, name, ctx=None):
816 name = name.decode(encoding=self.inodes.encoding)
817 _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
818 p = self._check_writable(inode_parent)
823 def rmdir(self, inode_parent, name, ctx=None):
824 name = name.decode(encoding=self.inodes.encoding)
825 _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
826 p = self._check_writable(inode_parent)
831 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
832 name_old = name_old.decode(encoding=self.inodes.encoding)
833 name_new = name_new.decode(encoding=self.inodes.encoding)
834 _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
835 src = self._check_writable(inode_parent_old)
836 dest = self._check_writable(inode_parent_new)
837 dest.rename(name_old, name_new, src)
842 if fh in self._filehandles:
843 self._filehandles[fh].flush()
845 def fsync(self, fh, datasync):
848 def fsyncdir(self, fh, datasync):