1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 """FUSE driver for Arvados Keep
9 There is one `Operations` object per mount point. It is the entry point for all
10 read and write requests from the llfuse module.
12 The operations object owns an `Inodes` object. The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
16 The `Inodes` object owns an `InodeCache` object. The inode cache records the
17 memory footprint of file system objects and when they are last used. When the
18 cache limit is exceeded, the least recently used objects are cleared.
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
22 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
25 Directory objects inherit from `fusedir.Directory`. The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`. These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
32 The general FUSE operation flow is as follows:
34 - The request handler is called with either an inode or file handle that is the
35 subject of the operation.
37 - Look up the inode using the Inodes table or the file handle in the
38 filehandles table to get the file system object.
40 - For methods that alter files or directories, check that the operation is
41 valid and permitted using _check_writable().
43 - Call the relevant method on the file system object.
47 The FUSE driver supports the Arvados event bus. When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
74 from prometheus_client import Summary
78 # Default _notify_queue has a limit of 1000 items, but it really needs to be
79 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
82 if hasattr(llfuse, 'capi'):
84 llfuse.capi._notify_queue = Queue.Queue()
87 llfuse._notify_queue = Queue.Queue()
89 LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
91 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
92 from fusefile import StringFile, FuseArvadosFile
94 _logger = logging.getLogger('arvados.arvados_fuse')
96 # Uncomment this to enable llfuse debug logging.
97 # log_handler = logging.StreamHandler()
98 # llogger = logging.getLogger('llfuse')
99 # llogger.addHandler(log_handler)
100 # llogger.setLevel(logging.DEBUG)
102 class Handle(object):
103 """Connects a numeric file handle to a File or Directory object that has
104 been opened by the client."""
106 def __init__(self, fh, obj):
118 class FileHandle(Handle):
119 """Connects a numeric file handle to a File object that has
120 been opened by the client."""
123 if self.obj.writable():
124 return self.obj.flush()
127 class DirectoryHandle(Handle):
128 """Connects a numeric file handle to a Directory object that has
129 been opened by the client."""
131 def __init__(self, fh, dirobj, entries):
132 super(DirectoryHandle, self).__init__(fh, dirobj)
133 self.entries = entries
136 class InodeCache(object):
137 """Records the memory footprint of objects and when they are last used.
139 When the cache limit is exceeded, the least recently used objects are
140 cleared. Clearing the object means discarding its contents to release
141 memory. The next time the object is accessed, it must be re-fetched from
142 the server. Note that the inode cache limit is a soft limit; the cache
143 limit may be exceeded if necessary to load very large objects, it may also
144 be exceeded if open file handles prevent objects from being cleared.
148 def __init__(self, cap, min_entries=4):
149 self._entries = collections.OrderedDict()
153 self.min_entries = min_entries
158 def _remove(self, obj, clear):
160 # Kernel behavior seems to be that if a file is
161 # referenced, its parents remain referenced too. This
162 # means has_ref() exits early when a collection is not
163 # candidate for eviction.
165 # By contrast, in_use() doesn't increment references on
166 # parents, so it requires a full tree walk to determine if
167 # a collection is a candidate for eviction. This takes
168 # .07s for 240000 files, which becomes a major drag when
169 # cap_cache is being called several times a second and
170 # there are multiple non-evictable collections in the
173 # So it is important for performance that we do the
174 # has_ref() check first.
176 if obj.has_ref(True):
177 _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
181 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
184 obj.kernel_invalidate()
185 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
188 # The llfuse lock is released in del_entry(), which is called by
189 # Directory.clear(). While the llfuse lock is released, it can happen
190 # that a reentrant call removes this entry before this call gets to it.
191 # Ensure that the entry is still valid before trying to remove it.
192 if obj.inode not in self._entries:
195 self._total -= obj.cache_size
196 del self._entries[obj.inode]
198 self._by_uuid[obj.cache_uuid].remove(obj)
199 if not self._by_uuid[obj.cache_uuid]:
200 del self._by_uuid[obj.cache_uuid]
201 obj.cache_uuid = None
203 _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
206 if self._total > self.cap:
207 for ent in self._entries.values():
208 if self._total < self.cap or len(self._entries) < self.min_entries:
210 self._remove(ent, True)
212 def manage(self, obj):
214 obj.cache_size = obj.objsize()
215 self._entries[obj.inode] = obj
216 obj.cache_uuid = obj.uuid()
218 if obj.cache_uuid not in self._by_uuid:
219 self._by_uuid[obj.cache_uuid] = [obj]
221 if obj not in self._by_uuid[obj.cache_uuid]:
222 self._by_uuid[obj.cache_uuid].append(obj)
223 self._total += obj.objsize()
224 _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
225 obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
228 def touch(self, obj):
230 if obj.inode in self._entries:
231 self._remove(obj, False)
234 def unmanage(self, obj):
235 if obj.persisted() and obj.inode in self._entries:
236 self._remove(obj, True)
238 def find_by_uuid(self, uuid):
239 return self._by_uuid.get(uuid, [])
242 self._entries.clear()
243 self._by_uuid.clear()
246 class Inodes(object):
247 """Manage the set of inodes. This is the mapping from a numeric id
248 to a concrete File or Directory object"""
250 def __init__(self, inode_cache, encoding="utf-8"):
252 self._counter = itertools.count(llfuse.ROOT_INODE)
253 self.inode_cache = inode_cache
254 self.encoding = encoding
255 self.deferred_invalidations = []
257 def __getitem__(self, item):
258 return self._entries[item]
260 def __setitem__(self, key, item):
261 self._entries[key] = item
264 return self._entries.iterkeys()
267 return self._entries.items()
269 def __contains__(self, k):
270 return k in self._entries
272 def touch(self, entry):
273 entry._atime = time.time()
274 self.inode_cache.touch(entry)
276 def add_entry(self, entry):
277 entry.inode = next(self._counter)
278 if entry.inode == llfuse.ROOT_INODE:
280 self._entries[entry.inode] = entry
281 self.inode_cache.manage(entry)
284 def del_entry(self, entry):
285 if entry.ref_count == 0:
286 self.inode_cache.unmanage(entry)
287 del self._entries[entry.inode]
288 with llfuse.lock_released:
293 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
295 def invalidate_inode(self, entry):
296 if entry.has_ref(False):
297 # Only necessary if the kernel has previously done a lookup on this
298 # inode and hasn't yet forgotten about it.
299 llfuse.invalidate_inode(entry.inode)
301 def invalidate_entry(self, entry, name):
302 if entry.has_ref(False):
303 # Only necessary if the kernel has previously done a lookup on this
304 # inode and hasn't yet forgotten about it.
305 llfuse.invalidate_entry(entry.inode, name.encode(self.encoding))
308 self.inode_cache.clear()
310 for k,v in self._entries.items():
313 except Exception as e:
314 _logger.exception("Error during finalize of inode %i", k)
316 self._entries.clear()
319 def catch_exceptions(orig_func):
320 """Catch uncaught exceptions and log them consistently."""
322 @functools.wraps(orig_func)
323 def catch_exceptions_wrapper(self, *args, **kwargs):
325 return orig_func(self, *args, **kwargs)
326 except llfuse.FUSEError:
328 except EnvironmentError as e:
329 raise llfuse.FUSEError(e.errno)
330 except arvados.errors.KeepWriteError as e:
331 _logger.error("Keep write error: " + str(e))
332 raise llfuse.FUSEError(errno.EIO)
333 except arvados.errors.NotFoundError as e:
334 _logger.error("Block not found error: " + str(e))
335 raise llfuse.FUSEError(errno.EIO)
337 _logger.exception("Unhandled exception during FUSE operation")
338 raise llfuse.FUSEError(errno.EIO)
340 return catch_exceptions_wrapper
343 class Operations(llfuse.Operations):
344 """This is the main interface with llfuse.
346 The methods on this object are called by llfuse threads to service FUSE
347 events to query and read from the file system.
349 llfuse has its own global lock which is acquired before calling a request handler,
350 so request handlers do not run concurrently unless the lock is explicitly released
351 using 'with llfuse.lock_released:'
355 fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
356 read_time = fuse_time.labels(op='read')
357 write_time = fuse_time.labels(op='write')
358 destroy_time = fuse_time.labels(op='destroy')
359 on_event_time = fuse_time.labels(op='on_event')
360 getattr_time = fuse_time.labels(op='getattr')
361 setattr_time = fuse_time.labels(op='setattr')
362 lookup_time = fuse_time.labels(op='lookup')
363 forget_time = fuse_time.labels(op='forget')
364 open_time = fuse_time.labels(op='open')
365 release_time = fuse_time.labels(op='release')
366 opendir_time = fuse_time.labels(op='opendir')
367 readdir_time = fuse_time.labels(op='readdir')
368 statfs_time = fuse_time.labels(op='statfs')
369 create_time = fuse_time.labels(op='create')
370 mkdir_time = fuse_time.labels(op='mkdir')
371 unlink_time = fuse_time.labels(op='unlink')
372 rmdir_time = fuse_time.labels(op='rmdir')
373 rename_time = fuse_time.labels(op='rename')
374 flush_time = fuse_time.labels(op='flush')
376 def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
377 super(Operations, self).__init__()
379 self._api_client = api_client
382 inode_cache = InodeCache(cap=256*1024*1024)
383 self.inodes = Inodes(inode_cache, encoding=encoding)
386 self.enable_write = enable_write
388 # dict of inode to filehandle
389 self._filehandles = {}
390 self._filehandles_counter = itertools.count(0)
392 # Other threads that need to wait until the fuse driver
393 # is fully initialized should wait() on this event object.
394 self.initlock = threading.Event()
396 # If we get overlapping shutdown events (e.g., fusermount -u
397 # -z and operations.destroy()) llfuse calls forget() on inodes
398 # that have already been deleted. To avoid this, we make
399 # forget() a no-op if called after destroy().
400 self._shutdown_started = threading.Event()
402 self.num_retries = num_retries
404 self.read_counter = arvados.keep.Counter()
405 self.write_counter = arvados.keep.Counter()
406 self.read_ops_counter = arvados.keep.Counter()
407 self.write_ops_counter = arvados.keep.Counter()
412 # Allow threads that are waiting for the driver to be finished
413 # initializing to continue
416 def metric_samples(self):
417 return self.fuse_time.collect()[0].samples
419 def metric_op_names(self):
421 for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
422 if cur_op not in ops:
426 def metric_value(self, opname, metric):
427 op_value = [sample.value for sample in self.metric_samples()
428 if sample.name == metric and sample.labels['op'] == opname]
429 return op_value[0] if len(op_value) == 1 else None
431 def metric_sum_func(self, opname):
432 return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
434 def metric_count_func(self, opname):
435 return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
440 self._shutdown_started.set()
445 # Different versions of llfuse require and forbid us to
446 # acquire the lock here. See #8345#note-37, #10805#note-9.
447 if LLFUSE_VERSION_0 and llfuse.lock.acquire():
450 llfuse.lock.release()
455 def access(self, inode, mode, ctx):
458 def listen_for_events(self):
459 self.events = arvados.events.subscribe(
461 [["event_type", "in", ["create", "update", "delete"]]],
464 @on_event_time.time()
466 def on_event(self, ev):
467 if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
470 properties = ev.get("properties") or {}
471 old_attrs = properties.get("old_attributes") or {}
472 new_attrs = properties.get("new_attributes") or {}
474 for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
476 if ev.get("object_kind") == "arvados#collection":
477 pdh = new_attrs.get("portable_data_hash")
478 # new_attributes.modified_at currently lacks
479 # subsecond precision (see #6347) so use event_at
480 # which should always be the same.
481 stamp = ev.get("event_at")
482 if (stamp and pdh and item.writable() and
483 item.collection is not None and
484 item.collection.modified() and
485 new_attrs.get("is_trashed") is not True):
486 item.update(to_record_version=(stamp, pdh))
488 oldowner = old_attrs.get("owner_uuid")
489 newowner = ev.get("object_owner_uuid")
491 self.inodes.inode_cache.find_by_uuid(oldowner) +
492 self.inodes.inode_cache.find_by_uuid(newowner)):
493 parent.child_event(ev)
497 def getattr(self, inode, ctx=None):
498 if inode not in self.inodes:
499 raise llfuse.FUSEError(errno.ENOENT)
501 e = self.inodes[inode]
503 entry = llfuse.EntryAttributes()
506 entry.entry_timeout = 0
507 entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
509 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
510 if isinstance(e, Directory):
511 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
513 entry.st_mode |= stat.S_IFREG
514 if isinstance(e, FuseArvadosFile):
515 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
517 if self.enable_write and e.writable():
518 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
521 entry.st_uid = self.uid
522 entry.st_gid = self.gid
525 entry.st_size = e.size()
527 entry.st_blksize = 512
528 entry.st_blocks = (entry.st_size/512)+1
529 if hasattr(entry, 'st_atime_ns'):
531 entry.st_atime_ns = int(e.atime() * 1000000000)
532 entry.st_mtime_ns = int(e.mtime() * 1000000000)
533 entry.st_ctime_ns = int(e.mtime() * 1000000000)
536 entry.st_atime = int(e.atime)
537 entry.st_mtime = int(e.mtime)
538 entry.st_ctime = int(e.mtime)
544 def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
545 entry = self.getattr(inode)
547 if fh is not None and fh in self._filehandles:
548 handle = self._filehandles[fh]
551 e = self.inodes[inode]
555 update_size = attr.st_size is not None
558 update_size = fields.update_size
559 if update_size and isinstance(e, FuseArvadosFile):
560 with llfuse.lock_released:
561 e.arvfile.truncate(attr.st_size)
562 entry.st_size = e.arvfile.size()
568 def lookup(self, parent_inode, name, ctx=None):
569 name = unicode(name, self.inodes.encoding)
575 if parent_inode in self.inodes:
576 p = self.inodes[parent_inode]
579 inode = p.parent_inode
580 elif isinstance(p, Directory) and name in p:
581 inode = p[name].inode
584 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
585 parent_inode, name, inode)
586 self.inodes[inode].inc_ref()
587 return self.getattr(inode)
589 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
591 raise llfuse.FUSEError(errno.ENOENT)
595 def forget(self, inodes):
596 if self._shutdown_started.is_set():
598 for inode, nlookup in inodes:
599 ent = self.inodes[inode]
600 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
601 if ent.dec_ref(nlookup) == 0 and ent.dead:
602 self.inodes.del_entry(ent)
606 def open(self, inode, flags, ctx=None):
607 if inode in self.inodes:
608 p = self.inodes[inode]
610 raise llfuse.FUSEError(errno.ENOENT)
612 if isinstance(p, Directory):
613 raise llfuse.FUSEError(errno.EISDIR)
615 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
616 raise llfuse.FUSEError(errno.EPERM)
618 fh = next(self._filehandles_counter)
619 self._filehandles[fh] = FileHandle(fh, p)
622 # Normally, we will have received an "update" event if the
623 # parent collection is stale here. However, even if the parent
624 # collection hasn't changed, the manifest might have been
625 # fetched so long ago that the signatures on the data block
626 # locators have expired. Calling checkupdate() on all
627 # ancestors ensures the signatures will be refreshed if
629 while p.parent_inode in self.inodes:
630 if p == self.inodes[p.parent_inode]:
632 p = self.inodes[p.parent_inode]
636 _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
642 def read(self, fh, off, size):
643 _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
644 self.read_ops_counter.add(1)
646 if fh in self._filehandles:
647 handle = self._filehandles[fh]
649 raise llfuse.FUSEError(errno.EBADF)
651 self.inodes.touch(handle.obj)
653 r = handle.obj.readfrom(off, size, self.num_retries)
655 self.read_counter.add(len(r))
660 def write(self, fh, off, buf):
661 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
662 self.write_ops_counter.add(1)
664 if fh in self._filehandles:
665 handle = self._filehandles[fh]
667 raise llfuse.FUSEError(errno.EBADF)
669 if not handle.obj.writable():
670 raise llfuse.FUSEError(errno.EPERM)
672 self.inodes.touch(handle.obj)
674 w = handle.obj.writeto(off, buf, self.num_retries)
676 self.write_counter.add(w)
681 def release(self, fh):
682 if fh in self._filehandles:
683 _logger.debug("arv-mount release fh %i", fh)
685 self._filehandles[fh].flush()
689 self._filehandles[fh].release()
690 del self._filehandles[fh]
691 self.inodes.inode_cache.cap_cache()
693 def releasedir(self, fh):
698 def opendir(self, inode, ctx=None):
699 _logger.debug("arv-mount opendir: inode %i", inode)
701 if inode in self.inodes:
702 p = self.inodes[inode]
704 raise llfuse.FUSEError(errno.ENOENT)
706 if not isinstance(p, Directory):
707 raise llfuse.FUSEError(errno.ENOTDIR)
709 fh = next(self._filehandles_counter)
710 if p.parent_inode in self.inodes:
711 parent = self.inodes[p.parent_inode]
713 raise llfuse.FUSEError(errno.EIO)
718 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
723 def readdir(self, fh, off):
724 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
726 if fh in self._filehandles:
727 handle = self._filehandles[fh]
729 raise llfuse.FUSEError(errno.EBADF)
732 while e < len(handle.entries):
733 if handle.entries[e][1].inode in self.inodes:
734 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
739 def statfs(self, ctx=None):
740 st = llfuse.StatvfsData()
741 st.f_bsize = 128 * 1024
754 def _check_writable(self, inode_parent):
755 if not self.enable_write:
756 raise llfuse.FUSEError(errno.EROFS)
758 if inode_parent in self.inodes:
759 p = self.inodes[inode_parent]
761 raise llfuse.FUSEError(errno.ENOENT)
763 if not isinstance(p, Directory):
764 raise llfuse.FUSEError(errno.ENOTDIR)
767 raise llfuse.FUSEError(errno.EPERM)
773 def create(self, inode_parent, name, mode, flags, ctx=None):
774 _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
776 p = self._check_writable(inode_parent)
779 # The file entry should have been implicitly created by callback.
781 fh = next(self._filehandles_counter)
782 self._filehandles[fh] = FileHandle(fh, f)
786 return (fh, self.getattr(f.inode))
790 def mkdir(self, inode_parent, name, mode, ctx=None):
791 _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
793 p = self._check_writable(inode_parent)
796 # The dir entry should have been implicitly created by callback.
800 return self.getattr(d.inode)
804 def unlink(self, inode_parent, name, ctx=None):
805 _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
806 p = self._check_writable(inode_parent)
811 def rmdir(self, inode_parent, name, ctx=None):
812 _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
813 p = self._check_writable(inode_parent)
818 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
819 _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
820 src = self._check_writable(inode_parent_old)
821 dest = self._check_writable(inode_parent_new)
822 dest.rename(name_old, name_new, src)
827 if fh in self._filehandles:
828 self._filehandles[fh].flush()
830 def fsync(self, fh, datasync):
833 def fsyncdir(self, fh, datasync):