1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 """FUSE driver for Arvados Keep
9 There is one `Operations` object per mount point. It is the entry point for all
10 read and write requests from the llfuse module.
12 The operations object owns an `Inodes` object. The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
16 The `Inodes` object owns an `InodeCache` object. The inode cache records the
17 memory footprint of file system objects and when they are last used. When the
18 cache limit is exceeded, the least recently used objects are cleared.
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
22 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
25 Directory objects inherit from `fusedir.Directory`. The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`. These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
32 The general FUSE operation flow is as follows:
34 - The request handler is called with either an inode or file handle that is the
35 subject of the operation.
37 - Look up the inode using the Inodes table or the file handle in the
38 filehandles table to get the file system object.
40 - For methods that alter files or directories, check that the operation is
41 valid and permitted using _check_writable().
43 - Call the relevant method on the file system object.
47 The FUSE driver supports the Arvados event bus. When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
77 # Default _notify_queue has a limit of 1000 items, but it really needs to be
78 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
81 if hasattr(llfuse, 'capi'):
83 llfuse.capi._notify_queue = Queue.Queue()
86 llfuse._notify_queue = Queue.Queue()
88 LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
90 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
91 from fusefile import StringFile, FuseArvadosFile
93 _logger = logging.getLogger('arvados.arvados_fuse')
95 # Uncomment this to enable llfuse debug logging.
96 # log_handler = logging.StreamHandler()
97 # llogger = logging.getLogger('llfuse')
98 # llogger.addHandler(log_handler)
99 # llogger.setLevel(logging.DEBUG)
101 class Handle(object):
102 """Connects a numeric file handle to a File or Directory object that has
103 been opened by the client."""
105 def __init__(self, fh, obj):
117 class FileHandle(Handle):
118 """Connects a numeric file handle to a File object that has
119 been opened by the client."""
122 if self.obj.writable():
123 return self.obj.flush()
126 class DirectoryHandle(Handle):
127 """Connects a numeric file handle to a Directory object that has
128 been opened by the client."""
130 def __init__(self, fh, dirobj, entries):
131 super(DirectoryHandle, self).__init__(fh, dirobj)
132 self.entries = entries
135 class InodeCache(object):
136 """Records the memory footprint of objects and when they are last used.
138 When the cache limit is exceeded, the least recently used objects are
139 cleared. Clearing the object means discarding its contents to release
140 memory. The next time the object is accessed, it must be re-fetched from
141 the server. Note that the inode cache limit is a soft limit; the cache
142 limit may be exceeded if necessary to load very large objects, it may also
143 be exceeded if open file handles prevent objects from being cleared.
147 def __init__(self, cap, min_entries=4):
148 self._entries = collections.OrderedDict()
152 self.min_entries = min_entries
157 def _remove(self, obj, clear):
160 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
162 obj.kernel_invalidate()
163 if obj.has_ref(True):
164 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
168 # The llfuse lock is released in del_entry(), which is called by
169 # Directory.clear(). While the llfuse lock is released, it can happen
170 # that a reentrant call removes this entry before this call gets to it.
171 # Ensure that the entry is still valid before trying to remove it.
172 if obj.inode not in self._entries:
175 self._total -= obj.cache_size
176 del self._entries[obj.inode]
178 self._by_uuid[obj.cache_uuid].remove(obj)
179 if not self._by_uuid[obj.cache_uuid]:
180 del self._by_uuid[obj.cache_uuid]
181 obj.cache_uuid = None
183 _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
186 if self._total > self.cap:
187 for ent in self._entries.values():
188 if self._total < self.cap or len(self._entries) < self.min_entries:
190 self._remove(ent, True)
192 def manage(self, obj):
194 obj.cache_size = obj.objsize()
195 self._entries[obj.inode] = obj
196 obj.cache_uuid = obj.uuid()
198 if obj.cache_uuid not in self._by_uuid:
199 self._by_uuid[obj.cache_uuid] = [obj]
201 if obj not in self._by_uuid[obj.cache_uuid]:
202 self._by_uuid[obj.cache_uuid].append(obj)
203 self._total += obj.objsize()
204 _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
207 def touch(self, obj):
209 if obj.inode in self._entries:
210 self._remove(obj, False)
213 def unmanage(self, obj):
214 if obj.persisted() and obj.inode in self._entries:
215 self._remove(obj, True)
217 def find_by_uuid(self, uuid):
218 return self._by_uuid.get(uuid, [])
221 self._entries.clear()
222 self._by_uuid.clear()
225 class Inodes(object):
226 """Manage the set of inodes. This is the mapping from a numeric id
227 to a concrete File or Directory object"""
229 def __init__(self, inode_cache, encoding="utf-8"):
231 self._counter = itertools.count(llfuse.ROOT_INODE)
232 self.inode_cache = inode_cache
233 self.encoding = encoding
234 self.deferred_invalidations = []
236 def __getitem__(self, item):
237 return self._entries[item]
239 def __setitem__(self, key, item):
240 self._entries[key] = item
243 return self._entries.iterkeys()
246 return self._entries.items()
248 def __contains__(self, k):
249 return k in self._entries
251 def touch(self, entry):
252 entry._atime = time.time()
253 self.inode_cache.touch(entry)
255 def add_entry(self, entry):
256 entry.inode = next(self._counter)
257 if entry.inode == llfuse.ROOT_INODE:
259 self._entries[entry.inode] = entry
260 self.inode_cache.manage(entry)
263 def del_entry(self, entry):
264 if entry.ref_count == 0:
265 self.inode_cache.unmanage(entry)
266 del self._entries[entry.inode]
267 with llfuse.lock_released:
272 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
274 def invalidate_inode(self, entry):
275 if entry.has_ref(False):
276 # Only necessary if the kernel has previously done a lookup on this
277 # inode and hasn't yet forgotten about it.
278 llfuse.invalidate_inode(entry.inode)
280 def invalidate_entry(self, entry, name):
281 if entry.has_ref(False):
282 # Only necessary if the kernel has previously done a lookup on this
283 # inode and hasn't yet forgotten about it.
284 llfuse.invalidate_entry(entry.inode, name.encode(self.encoding))
287 self.inode_cache.clear()
289 for k,v in self._entries.items():
292 except Exception as e:
293 _logger.exception("Error during finalize of inode %i", k)
295 self._entries.clear()
298 def catch_exceptions(orig_func):
299 """Catch uncaught exceptions and log them consistently."""
301 @functools.wraps(orig_func)
302 def catch_exceptions_wrapper(self, *args, **kwargs):
304 return orig_func(self, *args, **kwargs)
305 except llfuse.FUSEError:
307 except EnvironmentError as e:
308 raise llfuse.FUSEError(e.errno)
309 except arvados.errors.KeepWriteError as e:
310 _logger.error("Keep write error: " + str(e))
311 raise llfuse.FUSEError(errno.EIO)
312 except arvados.errors.NotFoundError as e:
313 _logger.error("Block not found error: " + str(e))
314 raise llfuse.FUSEError(errno.EIO)
316 _logger.exception("Unhandled exception during FUSE operation")
317 raise llfuse.FUSEError(errno.EIO)
319 return catch_exceptions_wrapper
322 class Operations(llfuse.Operations):
323 """This is the main interface with llfuse.
325 The methods on this object are called by llfuse threads to service FUSE
326 events to query and read from the file system.
328 llfuse has its own global lock which is acquired before calling a request handler,
329 so request handlers do not run concurrently unless the lock is explicitly released
330 using 'with llfuse.lock_released:'
334 def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
335 super(Operations, self).__init__()
337 self._api_client = api_client
340 inode_cache = InodeCache(cap=256*1024*1024)
341 self.inodes = Inodes(inode_cache, encoding=encoding)
344 self.enable_write = enable_write
346 # dict of inode to filehandle
347 self._filehandles = {}
348 self._filehandles_counter = itertools.count(0)
350 # Other threads that need to wait until the fuse driver
351 # is fully initialized should wait() on this event object.
352 self.initlock = threading.Event()
354 # If we get overlapping shutdown events (e.g., fusermount -u
355 # -z and operations.destroy()) llfuse calls forget() on inodes
356 # that have already been deleted. To avoid this, we make
357 # forget() a no-op if called after destroy().
358 self._shutdown_started = threading.Event()
360 self.num_retries = num_retries
362 self.read_counter = arvados.keep.Counter()
363 self.write_counter = arvados.keep.Counter()
364 self.read_ops_counter = arvados.keep.Counter()
365 self.write_ops_counter = arvados.keep.Counter()
370 # Allow threads that are waiting for the driver to be finished
371 # initializing to continue
376 self._shutdown_started.set()
381 # Different versions of llfuse require and forbid us to
382 # acquire the lock here. See #8345#note-37, #10805#note-9.
383 if LLFUSE_VERSION_0 and llfuse.lock.acquire():
386 llfuse.lock.release()
391 def access(self, inode, mode, ctx):
394 def listen_for_events(self):
395 self.events = arvados.events.subscribe(
397 [["event_type", "in", ["create", "update", "delete"]]],
401 def on_event(self, ev):
402 if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
405 properties = ev.get("properties") or {}
406 old_attrs = properties.get("old_attributes") or {}
407 new_attrs = properties.get("new_attributes") or {}
409 for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
411 if ev.get("object_kind") == "arvados#collection":
412 pdh = new_attrs.get("portable_data_hash")
413 # new_attributes.modified_at currently lacks
414 # subsecond precision (see #6347) so use event_at
415 # which should always be the same.
416 stamp = ev.get("event_at")
417 if (stamp and pdh and item.writable() and
418 item.collection is not None and
419 item.collection.modified() and
420 new_attrs.get("is_trashed") is not True):
421 item.update(to_record_version=(stamp, pdh))
423 oldowner = old_attrs.get("owner_uuid")
424 newowner = ev.get("object_owner_uuid")
426 self.inodes.inode_cache.find_by_uuid(oldowner) +
427 self.inodes.inode_cache.find_by_uuid(newowner)):
428 parent.child_event(ev)
431 def getattr(self, inode, ctx=None):
432 if inode not in self.inodes:
433 raise llfuse.FUSEError(errno.ENOENT)
435 e = self.inodes[inode]
437 entry = llfuse.EntryAttributes()
440 entry.entry_timeout = 0
441 entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
443 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
444 if isinstance(e, Directory):
445 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
447 entry.st_mode |= stat.S_IFREG
448 if isinstance(e, FuseArvadosFile):
449 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
451 if self.enable_write and e.writable():
452 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
455 entry.st_uid = self.uid
456 entry.st_gid = self.gid
459 entry.st_size = e.size()
461 entry.st_blksize = 512
462 entry.st_blocks = (entry.st_size/512)+1
463 if hasattr(entry, 'st_atime_ns'):
465 entry.st_atime_ns = int(e.atime() * 1000000000)
466 entry.st_mtime_ns = int(e.mtime() * 1000000000)
467 entry.st_ctime_ns = int(e.mtime() * 1000000000)
470 entry.st_atime = int(e.atime)
471 entry.st_mtime = int(e.mtime)
472 entry.st_ctime = int(e.mtime)
477 def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
478 entry = self.getattr(inode)
480 if fh is not None and fh in self._filehandles:
481 handle = self._filehandles[fh]
484 e = self.inodes[inode]
488 update_size = attr.st_size is not None
491 update_size = fields.update_size
492 if update_size and isinstance(e, FuseArvadosFile):
493 with llfuse.lock_released:
494 e.arvfile.truncate(attr.st_size)
495 entry.st_size = e.arvfile.size()
500 def lookup(self, parent_inode, name, ctx=None):
501 name = unicode(name, self.inodes.encoding)
507 if parent_inode in self.inodes:
508 p = self.inodes[parent_inode]
511 inode = p.parent_inode
512 elif isinstance(p, Directory) and name in p:
513 inode = p[name].inode
516 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
517 parent_inode, name, inode)
518 self.inodes[inode].inc_ref()
519 return self.getattr(inode)
521 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
523 raise llfuse.FUSEError(errno.ENOENT)
526 def forget(self, inodes):
527 if self._shutdown_started.is_set():
529 for inode, nlookup in inodes:
530 ent = self.inodes[inode]
531 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
532 if ent.dec_ref(nlookup) == 0 and ent.dead:
533 self.inodes.del_entry(ent)
536 def open(self, inode, flags, ctx=None):
537 if inode in self.inodes:
538 p = self.inodes[inode]
540 raise llfuse.FUSEError(errno.ENOENT)
542 if isinstance(p, Directory):
543 raise llfuse.FUSEError(errno.EISDIR)
545 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
546 raise llfuse.FUSEError(errno.EPERM)
548 fh = next(self._filehandles_counter)
549 self._filehandles[fh] = FileHandle(fh, p)
552 # Normally, we will have received an "update" event if the
553 # parent collection is stale here. However, even if the parent
554 # collection hasn't changed, the manifest might have been
555 # fetched so long ago that the signatures on the data block
556 # locators have expired. Calling checkupdate() on all
557 # ancestors ensures the signatures will be refreshed if
559 while p.parent_inode in self.inodes:
560 if p == self.inodes[p.parent_inode]:
562 p = self.inodes[p.parent_inode]
566 _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
571 def read(self, fh, off, size):
572 _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
573 self.read_ops_counter.add(1)
575 if fh in self._filehandles:
576 handle = self._filehandles[fh]
578 raise llfuse.FUSEError(errno.EBADF)
580 self.inodes.touch(handle.obj)
582 r = handle.obj.readfrom(off, size, self.num_retries)
584 self.read_counter.add(len(r))
588 def write(self, fh, off, buf):
589 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
590 self.write_ops_counter.add(1)
592 if fh in self._filehandles:
593 handle = self._filehandles[fh]
595 raise llfuse.FUSEError(errno.EBADF)
597 if not handle.obj.writable():
598 raise llfuse.FUSEError(errno.EPERM)
600 self.inodes.touch(handle.obj)
602 w = handle.obj.writeto(off, buf, self.num_retries)
604 self.write_counter.add(w)
608 def release(self, fh):
609 if fh in self._filehandles:
610 _logger.debug("arv-mount release fh %i", fh)
612 self._filehandles[fh].flush()
616 self._filehandles[fh].release()
617 del self._filehandles[fh]
618 self.inodes.inode_cache.cap_cache()
620 def releasedir(self, fh):
624 def opendir(self, inode, ctx=None):
625 _logger.debug("arv-mount opendir: inode %i", inode)
627 if inode in self.inodes:
628 p = self.inodes[inode]
630 raise llfuse.FUSEError(errno.ENOENT)
632 if not isinstance(p, Directory):
633 raise llfuse.FUSEError(errno.ENOTDIR)
635 fh = next(self._filehandles_counter)
636 if p.parent_inode in self.inodes:
637 parent = self.inodes[p.parent_inode]
639 raise llfuse.FUSEError(errno.EIO)
644 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
648 def readdir(self, fh, off):
649 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
651 if fh in self._filehandles:
652 handle = self._filehandles[fh]
654 raise llfuse.FUSEError(errno.EBADF)
657 while e < len(handle.entries):
658 if handle.entries[e][1].inode in self.inodes:
659 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
663 def statfs(self, ctx=None):
664 st = llfuse.StatvfsData()
665 st.f_bsize = 128 * 1024
678 def _check_writable(self, inode_parent):
679 if not self.enable_write:
680 raise llfuse.FUSEError(errno.EROFS)
682 if inode_parent in self.inodes:
683 p = self.inodes[inode_parent]
685 raise llfuse.FUSEError(errno.ENOENT)
687 if not isinstance(p, Directory):
688 raise llfuse.FUSEError(errno.ENOTDIR)
691 raise llfuse.FUSEError(errno.EPERM)
696 def create(self, inode_parent, name, mode, flags, ctx=None):
697 _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
699 p = self._check_writable(inode_parent)
702 # The file entry should have been implicitly created by callback.
704 fh = next(self._filehandles_counter)
705 self._filehandles[fh] = FileHandle(fh, f)
709 return (fh, self.getattr(f.inode))
712 def mkdir(self, inode_parent, name, mode, ctx=None):
713 _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
715 p = self._check_writable(inode_parent)
718 # The dir entry should have been implicitly created by callback.
722 return self.getattr(d.inode)
725 def unlink(self, inode_parent, name, ctx=None):
726 _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
727 p = self._check_writable(inode_parent)
731 def rmdir(self, inode_parent, name, ctx=None):
732 _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
733 p = self._check_writable(inode_parent)
737 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
738 _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
739 src = self._check_writable(inode_parent_old)
740 dest = self._check_writable(inode_parent_new)
741 dest.rename(name_old, name_new, src)
745 if fh in self._filehandles:
746 self._filehandles[fh].flush()
748 def fsync(self, fh, datasync):
751 def fsyncdir(self, fh, datasync):