1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 """FUSE driver for Arvados Keep
9 There is one `Operations` object per mount point. It is the entry point for all
10 read and write requests from the llfuse module.
12 The operations object owns an `Inodes` object. The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
16 The `Inodes` object owns an `InodeCache` object. The inode cache records the
17 memory footprint of file system objects and when they are last used. When the
18 cache limit is exceeded, the least recently used objects are cleared.
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
22 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
25 Directory objects inherit from `fusedir.Directory`. The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`. These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
32 The general FUSE operation flow is as follows:
34 - The request handler is called with either an inode or file handle that is the
35 subject of the operation.
37 - Look up the inode using the Inodes table or the file handle in the
38 filehandles table to get the file system object.
40 - For methods that alter files or directories, check that the operation is
41 valid and permitted using _check_writable().
43 - Call the relevant method on the file system object.
47 The FUSE driver supports the Arvados event bus. When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
77 # Default _notify_queue has a limit of 1000 items, but it really needs to be
78 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
81 if hasattr(llfuse, 'capi'):
83 llfuse.capi._notify_queue = Queue.Queue()
86 llfuse._notify_queue = Queue.Queue()
88 LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
90 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
91 from fusefile import StringFile, FuseArvadosFile
93 _logger = logging.getLogger('arvados.arvados_fuse')
95 # Uncomment this to enable llfuse debug logging.
96 # log_handler = logging.StreamHandler()
97 # llogger = logging.getLogger('llfuse')
98 # llogger.addHandler(log_handler)
99 # llogger.setLevel(logging.DEBUG)
101 class Handle(object):
102 """Connects a numeric file handle to a File or Directory object that has
103 been opened by the client."""
105 def __init__(self, fh, obj):
117 class FileHandle(Handle):
118 """Connects a numeric file handle to a File object that has
119 been opened by the client."""
122 if self.obj.writable():
123 return self.obj.flush()
126 class DirectoryHandle(Handle):
127 """Connects a numeric file handle to a Directory object that has
128 been opened by the client."""
130 def __init__(self, fh, dirobj, entries):
131 super(DirectoryHandle, self).__init__(fh, dirobj)
132 self.entries = entries
135 class InodeCache(object):
136 """Records the memory footprint of objects and when they are last used.
138 When the cache limit is exceeded, the least recently used objects are
139 cleared. Clearing the object means discarding its contents to release
140 memory. The next time the object is accessed, it must be re-fetched from
141 the server. Note that the inode cache limit is a soft limit; the cache
142 limit may be exceeded if necessary to load very large objects, it may also
143 be exceeded if open file handles prevent objects from being cleared.
147 def __init__(self, cap, min_entries=4):
148 self._entries = collections.OrderedDict()
152 self.min_entries = min_entries
157 def _remove(self, obj, clear):
160 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
162 if obj.has_ref(True):
163 _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
165 obj.kernel_invalidate()
166 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
169 # The llfuse lock is released in del_entry(), which is called by
170 # Directory.clear(). While the llfuse lock is released, it can happen
171 # that a reentrant call removes this entry before this call gets to it.
172 # Ensure that the entry is still valid before trying to remove it.
173 if obj.inode not in self._entries:
176 self._total -= obj.cache_size
177 del self._entries[obj.inode]
179 self._by_uuid[obj.cache_uuid].remove(obj)
180 if not self._by_uuid[obj.cache_uuid]:
181 del self._by_uuid[obj.cache_uuid]
182 obj.cache_uuid = None
184 _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
187 if self._total > self.cap:
188 for ent in self._entries.values():
189 if self._total < self.cap or len(self._entries) < self.min_entries:
191 self._remove(ent, True)
193 def manage(self, obj):
195 obj.cache_size = obj.objsize()
196 self._entries[obj.inode] = obj
197 obj.cache_uuid = obj.uuid()
199 if obj.cache_uuid not in self._by_uuid:
200 self._by_uuid[obj.cache_uuid] = [obj]
202 if obj not in self._by_uuid[obj.cache_uuid]:
203 self._by_uuid[obj.cache_uuid].append(obj)
204 self._total += obj.objsize()
205 _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
208 def touch(self, obj):
210 if obj.inode in self._entries:
211 self._remove(obj, False)
214 def unmanage(self, obj):
215 if obj.persisted() and obj.inode in self._entries:
216 self._remove(obj, True)
218 def find_by_uuid(self, uuid):
219 return self._by_uuid.get(uuid, [])
222 self._entries.clear()
223 self._by_uuid.clear()
226 class Inodes(object):
227 """Manage the set of inodes. This is the mapping from a numeric id
228 to a concrete File or Directory object"""
230 def __init__(self, inode_cache, encoding="utf-8"):
232 self._counter = itertools.count(llfuse.ROOT_INODE)
233 self.inode_cache = inode_cache
234 self.encoding = encoding
235 self.deferred_invalidations = []
237 def __getitem__(self, item):
238 return self._entries[item]
240 def __setitem__(self, key, item):
241 self._entries[key] = item
244 return self._entries.iterkeys()
247 return self._entries.items()
249 def __contains__(self, k):
250 return k in self._entries
252 def touch(self, entry):
253 entry._atime = time.time()
254 self.inode_cache.touch(entry)
256 def add_entry(self, entry):
257 entry.inode = next(self._counter)
258 if entry.inode == llfuse.ROOT_INODE:
260 self._entries[entry.inode] = entry
261 self.inode_cache.manage(entry)
264 def del_entry(self, entry):
265 if entry.ref_count == 0:
266 self.inode_cache.unmanage(entry)
267 del self._entries[entry.inode]
268 with llfuse.lock_released:
273 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
275 def invalidate_inode(self, entry):
276 if entry.has_ref(False):
277 # Only necessary if the kernel has previously done a lookup on this
278 # inode and hasn't yet forgotten about it.
279 llfuse.invalidate_inode(entry.inode)
281 def invalidate_entry(self, entry, name):
282 if entry.has_ref(False):
283 # Only necessary if the kernel has previously done a lookup on this
284 # inode and hasn't yet forgotten about it.
285 llfuse.invalidate_entry(entry.inode, name.encode(self.encoding))
288 self.inode_cache.clear()
290 for k,v in self._entries.items():
293 except Exception as e:
294 _logger.exception("Error during finalize of inode %i", k)
296 self._entries.clear()
299 def catch_exceptions(orig_func):
300 """Catch uncaught exceptions and log them consistently."""
302 @functools.wraps(orig_func)
303 def catch_exceptions_wrapper(self, *args, **kwargs):
305 return orig_func(self, *args, **kwargs)
306 except llfuse.FUSEError:
308 except EnvironmentError as e:
309 raise llfuse.FUSEError(e.errno)
310 except arvados.errors.KeepWriteError as e:
311 _logger.error("Keep write error: " + str(e))
312 raise llfuse.FUSEError(errno.EIO)
313 except arvados.errors.NotFoundError as e:
314 _logger.error("Block not found error: " + str(e))
315 raise llfuse.FUSEError(errno.EIO)
317 _logger.exception("Unhandled exception during FUSE operation")
318 raise llfuse.FUSEError(errno.EIO)
320 return catch_exceptions_wrapper
323 class Operations(llfuse.Operations):
324 """This is the main interface with llfuse.
326 The methods on this object are called by llfuse threads to service FUSE
327 events to query and read from the file system.
329 llfuse has its own global lock which is acquired before calling a request handler,
330 so request handlers do not run concurrently unless the lock is explicitly released
331 using 'with llfuse.lock_released:'
335 def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
336 super(Operations, self).__init__()
338 self._api_client = api_client
341 inode_cache = InodeCache(cap=256*1024*1024)
342 self.inodes = Inodes(inode_cache, encoding=encoding)
345 self.enable_write = enable_write
347 # dict of inode to filehandle
348 self._filehandles = {}
349 self._filehandles_counter = itertools.count(0)
351 # Other threads that need to wait until the fuse driver
352 # is fully initialized should wait() on this event object.
353 self.initlock = threading.Event()
355 # If we get overlapping shutdown events (e.g., fusermount -u
356 # -z and operations.destroy()) llfuse calls forget() on inodes
357 # that have already been deleted. To avoid this, we make
358 # forget() a no-op if called after destroy().
359 self._shutdown_started = threading.Event()
361 self.num_retries = num_retries
363 self.read_counter = arvados.keep.Counter()
364 self.write_counter = arvados.keep.Counter()
365 self.read_ops_counter = arvados.keep.Counter()
366 self.write_ops_counter = arvados.keep.Counter()
371 # Allow threads that are waiting for the driver to be finished
372 # initializing to continue
377 self._shutdown_started.set()
382 # Different versions of llfuse require and forbid us to
383 # acquire the lock here. See #8345#note-37, #10805#note-9.
384 if LLFUSE_VERSION_0 and llfuse.lock.acquire():
387 llfuse.lock.release()
392 def access(self, inode, mode, ctx):
395 def listen_for_events(self):
396 self.events = arvados.events.subscribe(
398 [["event_type", "in", ["create", "update", "delete"]]],
402 def on_event(self, ev):
403 if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
406 properties = ev.get("properties") or {}
407 old_attrs = properties.get("old_attributes") or {}
408 new_attrs = properties.get("new_attributes") or {}
410 for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
412 if ev.get("object_kind") == "arvados#collection":
413 pdh = new_attrs.get("portable_data_hash")
414 # new_attributes.modified_at currently lacks
415 # subsecond precision (see #6347) so use event_at
416 # which should always be the same.
417 stamp = ev.get("event_at")
418 if (stamp and pdh and item.writable() and
419 item.collection is not None and
420 item.collection.modified() and
421 new_attrs.get("is_trashed") is not True):
422 item.update(to_record_version=(stamp, pdh))
424 oldowner = old_attrs.get("owner_uuid")
425 newowner = ev.get("object_owner_uuid")
427 self.inodes.inode_cache.find_by_uuid(oldowner) +
428 self.inodes.inode_cache.find_by_uuid(newowner)):
429 parent.child_event(ev)
432 def getattr(self, inode, ctx=None):
433 if inode not in self.inodes:
434 raise llfuse.FUSEError(errno.ENOENT)
436 e = self.inodes[inode]
438 entry = llfuse.EntryAttributes()
441 entry.entry_timeout = 0
442 entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
444 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
445 if isinstance(e, Directory):
446 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
448 entry.st_mode |= stat.S_IFREG
449 if isinstance(e, FuseArvadosFile):
450 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
452 if self.enable_write and e.writable():
453 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
456 entry.st_uid = self.uid
457 entry.st_gid = self.gid
460 entry.st_size = e.size()
462 entry.st_blksize = 512
463 entry.st_blocks = (entry.st_size/512)+1
464 if hasattr(entry, 'st_atime_ns'):
466 entry.st_atime_ns = int(e.atime() * 1000000000)
467 entry.st_mtime_ns = int(e.mtime() * 1000000000)
468 entry.st_ctime_ns = int(e.mtime() * 1000000000)
471 entry.st_atime = int(e.atime)
472 entry.st_mtime = int(e.mtime)
473 entry.st_ctime = int(e.mtime)
478 def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
479 entry = self.getattr(inode)
481 if fh is not None and fh in self._filehandles:
482 handle = self._filehandles[fh]
485 e = self.inodes[inode]
489 update_size = attr.st_size is not None
492 update_size = fields.update_size
493 if update_size and isinstance(e, FuseArvadosFile):
494 with llfuse.lock_released:
495 e.arvfile.truncate(attr.st_size)
496 entry.st_size = e.arvfile.size()
501 def lookup(self, parent_inode, name, ctx=None):
502 name = unicode(name, self.inodes.encoding)
508 if parent_inode in self.inodes:
509 p = self.inodes[parent_inode]
512 inode = p.parent_inode
513 elif isinstance(p, Directory) and name in p:
514 inode = p[name].inode
517 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
518 parent_inode, name, inode)
519 self.inodes[inode].inc_ref()
520 return self.getattr(inode)
522 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
524 raise llfuse.FUSEError(errno.ENOENT)
527 def forget(self, inodes):
528 if self._shutdown_started.is_set():
530 for inode, nlookup in inodes:
531 ent = self.inodes[inode]
532 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
533 if ent.dec_ref(nlookup) == 0 and ent.dead:
534 self.inodes.del_entry(ent)
537 def open(self, inode, flags, ctx=None):
538 if inode in self.inodes:
539 p = self.inodes[inode]
541 raise llfuse.FUSEError(errno.ENOENT)
543 if isinstance(p, Directory):
544 raise llfuse.FUSEError(errno.EISDIR)
546 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
547 raise llfuse.FUSEError(errno.EPERM)
549 fh = next(self._filehandles_counter)
550 self._filehandles[fh] = FileHandle(fh, p)
553 # Normally, we will have received an "update" event if the
554 # parent collection is stale here. However, even if the parent
555 # collection hasn't changed, the manifest might have been
556 # fetched so long ago that the signatures on the data block
557 # locators have expired. Calling checkupdate() on all
558 # ancestors ensures the signatures will be refreshed if
560 while p.parent_inode in self.inodes:
561 if p == self.inodes[p.parent_inode]:
563 p = self.inodes[p.parent_inode]
567 _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
572 def read(self, fh, off, size):
573 _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
574 self.read_ops_counter.add(1)
576 if fh in self._filehandles:
577 handle = self._filehandles[fh]
579 raise llfuse.FUSEError(errno.EBADF)
581 self.inodes.touch(handle.obj)
583 r = handle.obj.readfrom(off, size, self.num_retries)
585 self.read_counter.add(len(r))
589 def write(self, fh, off, buf):
590 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
591 self.write_ops_counter.add(1)
593 if fh in self._filehandles:
594 handle = self._filehandles[fh]
596 raise llfuse.FUSEError(errno.EBADF)
598 if not handle.obj.writable():
599 raise llfuse.FUSEError(errno.EPERM)
601 self.inodes.touch(handle.obj)
603 w = handle.obj.writeto(off, buf, self.num_retries)
605 self.write_counter.add(w)
609 def release(self, fh):
610 if fh in self._filehandles:
611 _logger.debug("arv-mount release fh %i", fh)
613 self._filehandles[fh].flush()
617 self._filehandles[fh].release()
618 del self._filehandles[fh]
619 self.inodes.inode_cache.cap_cache()
621 def releasedir(self, fh):
625 def opendir(self, inode, ctx=None):
626 _logger.debug("arv-mount opendir: inode %i", inode)
628 if inode in self.inodes:
629 p = self.inodes[inode]
631 raise llfuse.FUSEError(errno.ENOENT)
633 if not isinstance(p, Directory):
634 raise llfuse.FUSEError(errno.ENOTDIR)
636 fh = next(self._filehandles_counter)
637 if p.parent_inode in self.inodes:
638 parent = self.inodes[p.parent_inode]
640 raise llfuse.FUSEError(errno.EIO)
645 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
649 def readdir(self, fh, off):
650 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
652 if fh in self._filehandles:
653 handle = self._filehandles[fh]
655 raise llfuse.FUSEError(errno.EBADF)
658 while e < len(handle.entries):
659 if handle.entries[e][1].inode in self.inodes:
660 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
664 def statfs(self, ctx=None):
665 st = llfuse.StatvfsData()
666 st.f_bsize = 128 * 1024
679 def _check_writable(self, inode_parent):
680 if not self.enable_write:
681 raise llfuse.FUSEError(errno.EROFS)
683 if inode_parent in self.inodes:
684 p = self.inodes[inode_parent]
686 raise llfuse.FUSEError(errno.ENOENT)
688 if not isinstance(p, Directory):
689 raise llfuse.FUSEError(errno.ENOTDIR)
692 raise llfuse.FUSEError(errno.EPERM)
697 def create(self, inode_parent, name, mode, flags, ctx=None):
698 _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
700 p = self._check_writable(inode_parent)
703 # The file entry should have been implicitly created by callback.
705 fh = next(self._filehandles_counter)
706 self._filehandles[fh] = FileHandle(fh, f)
710 return (fh, self.getattr(f.inode))
713 def mkdir(self, inode_parent, name, mode, ctx=None):
714 _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
716 p = self._check_writable(inode_parent)
719 # The dir entry should have been implicitly created by callback.
723 return self.getattr(d.inode)
726 def unlink(self, inode_parent, name, ctx=None):
727 _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
728 p = self._check_writable(inode_parent)
732 def rmdir(self, inode_parent, name, ctx=None):
733 _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
734 p = self._check_writable(inode_parent)
738 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
739 _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
740 src = self._check_writable(inode_parent_old)
741 dest = self._check_writable(inode_parent_new)
742 dest.rename(name_old, name_new, src)
746 if fh in self._filehandles:
747 self._filehandles[fh].flush()
749 def fsync(self, fh, datasync):
752 def fsyncdir(self, fh, datasync):