1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 """FUSE driver for Arvados Keep
9 There is one `Operations` object per mount point. It is the entry point for all
10 read and write requests from the llfuse module.
12 The operations object owns an `Inodes` object. The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
16 The `Inodes` object owns an `InodeCache` object. The inode cache records the
17 memory footprint of file system objects and when they are last used. When the
18 cache limit is exceeded, the least recently used objects are cleared.
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
22 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
25 Directory objects inherit from `fusedir.Directory`. The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`. These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
32 The general FUSE operation flow is as follows:
34 - The request handler is called with either an inode or file handle that is the
35 subject of the operation.
37 - Look up the inode using the Inodes table or the file handle in the
38 filehandles table to get the file system object.
40 - For methods that alter files or directories, check that the operation is
41 valid and permitted using _check_writable().
43 - Call the relevant method on the file system object.
47 The FUSE driver supports the Arvados event bus. When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
77 # Default _notify_queue has a limit of 1000 items, but it really needs to be
78 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
81 if hasattr(llfuse, 'capi'):
83 llfuse.capi._notify_queue = Queue.Queue()
86 llfuse._notify_queue = Queue.Queue()
88 LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
90 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
91 from fusefile import StringFile, FuseArvadosFile
93 _logger = logging.getLogger('arvados.arvados_fuse')
95 # Uncomment this to enable llfuse debug logging.
96 # log_handler = logging.StreamHandler()
97 # llogger = logging.getLogger('llfuse')
98 # llogger.addHandler(log_handler)
99 # llogger.setLevel(logging.DEBUG)
101 class Handle(object):
102 """Connects a numeric file handle to a File or Directory object that has
103 been opened by the client."""
105 def __init__(self, fh, obj):
117 class FileHandle(Handle):
118 """Connects a numeric file handle to a File object that has
119 been opened by the client."""
122 if self.obj.writable():
123 return self.obj.flush()
126 class DirectoryHandle(Handle):
127 """Connects a numeric file handle to a Directory object that has
128 been opened by the client."""
130 def __init__(self, fh, dirobj, entries):
131 super(DirectoryHandle, self).__init__(fh, dirobj)
132 self.entries = entries
135 class InodeCache(object):
136 """Records the memory footprint of objects and when they are last used.
138 When the cache limit is exceeded, the least recently used objects are
139 cleared. Clearing the object means discarding its contents to release
140 memory. The next time the object is accessed, it must be re-fetched from
141 the server. Note that the inode cache limit is a soft limit; the cache
142 limit may be exceeded if necessary to load very large objects, it may also
143 be exceeded if open file handles prevent objects from being cleared.
147 def __init__(self, cap, min_entries=4):
148 self._entries = collections.OrderedDict()
152 self.min_entries = min_entries
157 def _remove(self, obj, clear):
159 # Kernel behavior seems to be that if a file is
160 # referenced, its parents remain referenced too. This
161 # means has_ref() exits early when a collection is not
162 # candidate for eviction.
164 # By contrast, in_use() doesn't increment references on
165 # parents, so it requires a full tree walk to determine if
166 # a collection is a candidate for eviction. This takes
167 # .07s for 240000 files, which becomes a major drag when
168 # cap_cache is being called several times a second and
169 # there are multiple non-evictable collections in the
172 # So it is important for performance that we do the
173 # has_ref() check first.
175 if obj.has_ref(True):
176 _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
180 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
183 obj.kernel_invalidate()
184 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
187 # The llfuse lock is released in del_entry(), which is called by
188 # Directory.clear(). While the llfuse lock is released, it can happen
189 # that a reentrant call removes this entry before this call gets to it.
190 # Ensure that the entry is still valid before trying to remove it.
191 if obj.inode not in self._entries:
194 self._total -= obj.cache_size
195 del self._entries[obj.inode]
197 self._by_uuid[obj.cache_uuid].remove(obj)
198 if not self._by_uuid[obj.cache_uuid]:
199 del self._by_uuid[obj.cache_uuid]
200 obj.cache_uuid = None
202 _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
205 if self._total > self.cap:
206 for ent in self._entries.values():
207 if self._total < self.cap or len(self._entries) < self.min_entries:
209 self._remove(ent, True)
211 def manage(self, obj):
213 obj.cache_size = obj.objsize()
214 self._entries[obj.inode] = obj
215 obj.cache_uuid = obj.uuid()
217 if obj.cache_uuid not in self._by_uuid:
218 self._by_uuid[obj.cache_uuid] = [obj]
220 if obj not in self._by_uuid[obj.cache_uuid]:
221 self._by_uuid[obj.cache_uuid].append(obj)
222 self._total += obj.objsize()
223 _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
224 obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
227 def touch(self, obj):
229 if obj.inode in self._entries:
230 self._remove(obj, False)
233 def unmanage(self, obj):
234 if obj.persisted() and obj.inode in self._entries:
235 self._remove(obj, True)
237 def find_by_uuid(self, uuid):
238 return self._by_uuid.get(uuid, [])
241 self._entries.clear()
242 self._by_uuid.clear()
245 class Inodes(object):
246 """Manage the set of inodes. This is the mapping from a numeric id
247 to a concrete File or Directory object"""
249 def __init__(self, inode_cache, encoding="utf-8"):
251 self._counter = itertools.count(llfuse.ROOT_INODE)
252 self.inode_cache = inode_cache
253 self.encoding = encoding
254 self.deferred_invalidations = []
256 def __getitem__(self, item):
257 return self._entries[item]
259 def __setitem__(self, key, item):
260 self._entries[key] = item
263 return self._entries.iterkeys()
266 return self._entries.items()
268 def __contains__(self, k):
269 return k in self._entries
271 def touch(self, entry):
272 entry._atime = time.time()
273 self.inode_cache.touch(entry)
275 def add_entry(self, entry):
276 entry.inode = next(self._counter)
277 if entry.inode == llfuse.ROOT_INODE:
279 self._entries[entry.inode] = entry
280 self.inode_cache.manage(entry)
283 def del_entry(self, entry):
284 if entry.ref_count == 0:
285 self.inode_cache.unmanage(entry)
286 del self._entries[entry.inode]
287 with llfuse.lock_released:
292 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
294 def invalidate_inode(self, entry):
295 if entry.has_ref(False):
296 # Only necessary if the kernel has previously done a lookup on this
297 # inode and hasn't yet forgotten about it.
298 llfuse.invalidate_inode(entry.inode)
300 def invalidate_entry(self, entry, name):
301 if entry.has_ref(False):
302 # Only necessary if the kernel has previously done a lookup on this
303 # inode and hasn't yet forgotten about it.
304 llfuse.invalidate_entry(entry.inode, name.encode(self.encoding))
307 self.inode_cache.clear()
309 for k,v in self._entries.items():
312 except Exception as e:
313 _logger.exception("Error during finalize of inode %i", k)
315 self._entries.clear()
318 def catch_exceptions(orig_func):
319 """Catch uncaught exceptions and log them consistently."""
321 @functools.wraps(orig_func)
322 def catch_exceptions_wrapper(self, *args, **kwargs):
324 return orig_func(self, *args, **kwargs)
325 except llfuse.FUSEError:
327 except EnvironmentError as e:
328 raise llfuse.FUSEError(e.errno)
329 except arvados.errors.KeepWriteError as e:
330 _logger.error("Keep write error: " + str(e))
331 raise llfuse.FUSEError(errno.EIO)
332 except arvados.errors.NotFoundError as e:
333 _logger.error("Block not found error: " + str(e))
334 raise llfuse.FUSEError(errno.EIO)
336 _logger.exception("Unhandled exception during FUSE operation")
337 raise llfuse.FUSEError(errno.EIO)
339 return catch_exceptions_wrapper
342 class Operations(llfuse.Operations):
343 """This is the main interface with llfuse.
345 The methods on this object are called by llfuse threads to service FUSE
346 events to query and read from the file system.
348 llfuse has its own global lock which is acquired before calling a request handler,
349 so request handlers do not run concurrently unless the lock is explicitly released
350 using 'with llfuse.lock_released:'
354 def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
355 super(Operations, self).__init__()
357 self._api_client = api_client
360 inode_cache = InodeCache(cap=256*1024*1024)
361 self.inodes = Inodes(inode_cache, encoding=encoding)
364 self.enable_write = enable_write
366 # dict of inode to filehandle
367 self._filehandles = {}
368 self._filehandles_counter = itertools.count(0)
370 # Other threads that need to wait until the fuse driver
371 # is fully initialized should wait() on this event object.
372 self.initlock = threading.Event()
374 # If we get overlapping shutdown events (e.g., fusermount -u
375 # -z and operations.destroy()) llfuse calls forget() on inodes
376 # that have already been deleted. To avoid this, we make
377 # forget() a no-op if called after destroy().
378 self._shutdown_started = threading.Event()
380 self.num_retries = num_retries
382 self.read_counter = arvados.keep.Counter()
383 self.write_counter = arvados.keep.Counter()
384 self.read_ops_counter = arvados.keep.Counter()
385 self.write_ops_counter = arvados.keep.Counter()
390 # Allow threads that are waiting for the driver to be finished
391 # initializing to continue
396 self._shutdown_started.set()
401 # Different versions of llfuse require and forbid us to
402 # acquire the lock here. See #8345#note-37, #10805#note-9.
403 if LLFUSE_VERSION_0 and llfuse.lock.acquire():
406 llfuse.lock.release()
411 def access(self, inode, mode, ctx):
414 def listen_for_events(self):
415 self.events = arvados.events.subscribe(
417 [["event_type", "in", ["create", "update", "delete"]]],
421 def on_event(self, ev):
422 if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
425 properties = ev.get("properties") or {}
426 old_attrs = properties.get("old_attributes") or {}
427 new_attrs = properties.get("new_attributes") or {}
429 for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
431 if ev.get("object_kind") == "arvados#collection":
432 pdh = new_attrs.get("portable_data_hash")
433 # new_attributes.modified_at currently lacks
434 # subsecond precision (see #6347) so use event_at
435 # which should always be the same.
436 stamp = ev.get("event_at")
437 if (stamp and pdh and item.writable() and
438 item.collection is not None and
439 item.collection.modified() and
440 new_attrs.get("is_trashed") is not True):
441 item.update(to_record_version=(stamp, pdh))
443 oldowner = old_attrs.get("owner_uuid")
444 newowner = ev.get("object_owner_uuid")
446 self.inodes.inode_cache.find_by_uuid(oldowner) +
447 self.inodes.inode_cache.find_by_uuid(newowner)):
448 parent.child_event(ev)
451 def getattr(self, inode, ctx=None):
452 if inode not in self.inodes:
453 raise llfuse.FUSEError(errno.ENOENT)
455 e = self.inodes[inode]
457 entry = llfuse.EntryAttributes()
460 entry.entry_timeout = 0
461 entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
463 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
464 if isinstance(e, Directory):
465 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
467 entry.st_mode |= stat.S_IFREG
468 if isinstance(e, FuseArvadosFile):
469 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
471 if self.enable_write and e.writable():
472 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
475 entry.st_uid = self.uid
476 entry.st_gid = self.gid
479 entry.st_size = e.size()
481 entry.st_blksize = 512
482 entry.st_blocks = (entry.st_size/512)+1
483 if hasattr(entry, 'st_atime_ns'):
485 entry.st_atime_ns = int(e.atime() * 1000000000)
486 entry.st_mtime_ns = int(e.mtime() * 1000000000)
487 entry.st_ctime_ns = int(e.mtime() * 1000000000)
490 entry.st_atime = int(e.atime)
491 entry.st_mtime = int(e.mtime)
492 entry.st_ctime = int(e.mtime)
497 def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
498 entry = self.getattr(inode)
500 if fh is not None and fh in self._filehandles:
501 handle = self._filehandles[fh]
504 e = self.inodes[inode]
508 update_size = attr.st_size is not None
511 update_size = fields.update_size
512 if update_size and isinstance(e, FuseArvadosFile):
513 with llfuse.lock_released:
514 e.arvfile.truncate(attr.st_size)
515 entry.st_size = e.arvfile.size()
520 def lookup(self, parent_inode, name, ctx=None):
521 name = unicode(name, self.inodes.encoding)
527 if parent_inode in self.inodes:
528 p = self.inodes[parent_inode]
531 inode = p.parent_inode
532 elif isinstance(p, Directory) and name in p:
533 inode = p[name].inode
536 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
537 parent_inode, name, inode)
538 self.inodes[inode].inc_ref()
539 return self.getattr(inode)
541 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
543 raise llfuse.FUSEError(errno.ENOENT)
546 def forget(self, inodes):
547 if self._shutdown_started.is_set():
549 for inode, nlookup in inodes:
550 ent = self.inodes[inode]
551 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
552 if ent.dec_ref(nlookup) == 0 and ent.dead:
553 self.inodes.del_entry(ent)
556 def open(self, inode, flags, ctx=None):
557 if inode in self.inodes:
558 p = self.inodes[inode]
560 raise llfuse.FUSEError(errno.ENOENT)
562 if isinstance(p, Directory):
563 raise llfuse.FUSEError(errno.EISDIR)
565 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
566 raise llfuse.FUSEError(errno.EPERM)
568 fh = next(self._filehandles_counter)
569 self._filehandles[fh] = FileHandle(fh, p)
572 # Normally, we will have received an "update" event if the
573 # parent collection is stale here. However, even if the parent
574 # collection hasn't changed, the manifest might have been
575 # fetched so long ago that the signatures on the data block
576 # locators have expired. Calling checkupdate() on all
577 # ancestors ensures the signatures will be refreshed if
579 while p.parent_inode in self.inodes:
580 if p == self.inodes[p.parent_inode]:
582 p = self.inodes[p.parent_inode]
586 _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
591 def read(self, fh, off, size):
592 _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
593 self.read_ops_counter.add(1)
595 if fh in self._filehandles:
596 handle = self._filehandles[fh]
598 raise llfuse.FUSEError(errno.EBADF)
600 self.inodes.touch(handle.obj)
602 r = handle.obj.readfrom(off, size, self.num_retries)
604 self.read_counter.add(len(r))
608 def write(self, fh, off, buf):
609 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
610 self.write_ops_counter.add(1)
612 if fh in self._filehandles:
613 handle = self._filehandles[fh]
615 raise llfuse.FUSEError(errno.EBADF)
617 if not handle.obj.writable():
618 raise llfuse.FUSEError(errno.EPERM)
620 self.inodes.touch(handle.obj)
622 w = handle.obj.writeto(off, buf, self.num_retries)
624 self.write_counter.add(w)
628 def release(self, fh):
629 if fh in self._filehandles:
630 _logger.debug("arv-mount release fh %i", fh)
632 self._filehandles[fh].flush()
636 self._filehandles[fh].release()
637 del self._filehandles[fh]
638 self.inodes.inode_cache.cap_cache()
640 def releasedir(self, fh):
644 def opendir(self, inode, ctx=None):
645 _logger.debug("arv-mount opendir: inode %i", inode)
647 if inode in self.inodes:
648 p = self.inodes[inode]
650 raise llfuse.FUSEError(errno.ENOENT)
652 if not isinstance(p, Directory):
653 raise llfuse.FUSEError(errno.ENOTDIR)
655 fh = next(self._filehandles_counter)
656 if p.parent_inode in self.inodes:
657 parent = self.inodes[p.parent_inode]
659 raise llfuse.FUSEError(errno.EIO)
664 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
668 def readdir(self, fh, off):
669 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
671 if fh in self._filehandles:
672 handle = self._filehandles[fh]
674 raise llfuse.FUSEError(errno.EBADF)
677 while e < len(handle.entries):
678 if handle.entries[e][1].inode in self.inodes:
679 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
683 def statfs(self, ctx=None):
684 st = llfuse.StatvfsData()
685 st.f_bsize = 128 * 1024
698 def _check_writable(self, inode_parent):
699 if not self.enable_write:
700 raise llfuse.FUSEError(errno.EROFS)
702 if inode_parent in self.inodes:
703 p = self.inodes[inode_parent]
705 raise llfuse.FUSEError(errno.ENOENT)
707 if not isinstance(p, Directory):
708 raise llfuse.FUSEError(errno.ENOTDIR)
711 raise llfuse.FUSEError(errno.EPERM)
716 def create(self, inode_parent, name, mode, flags, ctx=None):
717 _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
719 p = self._check_writable(inode_parent)
722 # The file entry should have been implicitly created by callback.
724 fh = next(self._filehandles_counter)
725 self._filehandles[fh] = FileHandle(fh, f)
729 return (fh, self.getattr(f.inode))
732 def mkdir(self, inode_parent, name, mode, ctx=None):
733 _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
735 p = self._check_writable(inode_parent)
738 # The dir entry should have been implicitly created by callback.
742 return self.getattr(d.inode)
745 def unlink(self, inode_parent, name, ctx=None):
746 _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
747 p = self._check_writable(inode_parent)
751 def rmdir(self, inode_parent, name, ctx=None):
752 _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
753 p = self._check_writable(inode_parent)
757 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
758 _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
759 src = self._check_writable(inode_parent_old)
760 dest = self._check_writable(inode_parent_new)
761 dest.rename(name_old, name_new, src)
765 if fh in self._filehandles:
766 self._filehandles[fh].flush()
768 def fsync(self, fh, datasync):
771 def fsyncdir(self, fh, datasync):