1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 """FUSE driver for Arvados Keep
9 There is one `Operations` object per mount point. It is the entry point for all
10 read and write requests from the llfuse module.
12 The operations object owns an `Inodes` object. The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
16 The `Inodes` object owns an `InodeCache` object. The inode cache records the
17 memory footprint of file system objects and when they are last used. When the
18 cache limit is exceeded, the least recently used objects are cleared.
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
22 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
25 Directory objects inherit from `fusedir.Directory`. The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`. These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
32 The general FUSE operation flow is as follows:
34 - The request handler is called with either an inode or file handle that is the
35 subject of the operation.
37 - Look up the inode using the Inodes table or the file handle in the
38 filehandles table to get the file system object.
40 - For methods that alter files or directories, check that the operation is
41 valid and permitted using _check_writable().
43 - Call the relevant method on the file system object.
47 The FUSE driver supports the Arvados event bus. When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
77 # Default _notify_queue has a limit of 1000 items, but it really needs to be
78 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
81 if hasattr(llfuse, 'capi'):
83 llfuse.capi._notify_queue = Queue.Queue()
86 llfuse._notify_queue = Queue.Queue()
88 LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
90 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
91 from fusefile import StringFile, FuseArvadosFile
93 _logger = logging.getLogger('arvados.arvados_fuse')
95 # Uncomment this to enable llfuse debug logging.
96 # log_handler = logging.StreamHandler()
97 # llogger = logging.getLogger('llfuse')
98 # llogger.addHandler(log_handler)
99 # llogger.setLevel(logging.DEBUG)
101 class Handle(object):
102 """Connects a numeric file handle to a File or Directory object that has
103 been opened by the client."""
105 def __init__(self, fh, obj):
117 class FileHandle(Handle):
118 """Connects a numeric file handle to a File object that has
119 been opened by the client."""
122 if self.obj.writable():
123 return self.obj.flush()
126 class DirectoryHandle(Handle):
127 """Connects a numeric file handle to a Directory object that has
128 been opened by the client."""
130 def __init__(self, fh, dirobj, entries):
131 super(DirectoryHandle, self).__init__(fh, dirobj)
132 self.entries = entries
135 class InodeCache(object):
136 """Records the memory footprint of objects and when they are last used.
138 When the cache limit is exceeded, the least recently used objects are
139 cleared. Clearing the object means discarding its contents to release
140 memory. The next time the object is accessed, it must be re-fetched from
141 the server. Note that the inode cache limit is a soft limit; the cache
142 limit may be exceeded if necessary to load very large objects, it may also
143 be exceeded if open file handles prevent objects from being cleared.
147 def __init__(self, cap, min_entries=4):
148 self._entries = collections.OrderedDict()
152 self.min_entries = min_entries
157 def _remove(self, obj, clear):
160 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
162 if obj.has_ref(True):
163 obj.kernel_invalidate()
164 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
168 # The llfuse lock is released in del_entry(), which is called by
169 # Directory.clear(). While the llfuse lock is released, it can happen
170 # that a reentrant call removes this entry before this call gets to it.
171 # Ensure that the entry is still valid before trying to remove it.
172 if obj.inode not in self._entries:
175 self._total -= obj.cache_size
176 del self._entries[obj.inode]
178 self._by_uuid[obj.cache_uuid].remove(obj)
179 if not self._by_uuid[obj.cache_uuid]:
180 del self._by_uuid[obj.cache_uuid]
181 obj.cache_uuid = None
183 _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
186 if self._total > self.cap:
187 for ent in self._entries.values():
188 if self._total < self.cap or len(self._entries) < self.min_entries:
190 self._remove(ent, True)
192 def manage(self, obj):
194 obj.cache_size = obj.objsize()
195 self._entries[obj.inode] = obj
196 obj.cache_uuid = obj.uuid()
198 if obj.cache_uuid not in self._by_uuid:
199 self._by_uuid[obj.cache_uuid] = [obj]
201 if obj not in self._by_uuid[obj.cache_uuid]:
202 self._by_uuid[obj.cache_uuid].append(obj)
203 self._total += obj.objsize()
204 _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
207 def touch(self, obj):
209 if obj.inode in self._entries:
210 self._remove(obj, False)
213 def unmanage(self, obj):
214 if obj.persisted() and obj.inode in self._entries:
215 self._remove(obj, True)
217 def find_by_uuid(self, uuid):
218 return self._by_uuid.get(uuid, [])
221 self._entries.clear()
222 self._by_uuid.clear()
225 class Inodes(object):
226 """Manage the set of inodes. This is the mapping from a numeric id
227 to a concrete File or Directory object"""
229 def __init__(self, inode_cache, encoding="utf-8"):
231 self._counter = itertools.count(llfuse.ROOT_INODE)
232 self.inode_cache = inode_cache
233 self.encoding = encoding
234 self.deferred_invalidations = []
236 def __getitem__(self, item):
237 return self._entries[item]
239 def __setitem__(self, key, item):
240 self._entries[key] = item
243 return self._entries.iterkeys()
246 return self._entries.items()
248 def __contains__(self, k):
249 return k in self._entries
251 def touch(self, entry):
252 entry._atime = time.time()
253 self.inode_cache.touch(entry)
255 def add_entry(self, entry):
256 entry.inode = next(self._counter)
257 if entry.inode == llfuse.ROOT_INODE:
259 self._entries[entry.inode] = entry
260 self.inode_cache.manage(entry)
263 def del_entry(self, entry):
264 if entry.ref_count == 0:
265 self.inode_cache.unmanage(entry)
266 del self._entries[entry.inode]
267 with llfuse.lock_released:
269 self.invalidate_inode(entry.inode)
273 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
275 def invalidate_inode(self, inode):
276 llfuse.invalidate_inode(inode)
278 def invalidate_entry(self, inode, name):
279 llfuse.invalidate_entry(inode, name.encode(self.encoding))
282 self.inode_cache.clear()
284 for k,v in self._entries.items():
287 except Exception as e:
288 _logger.exception("Error during finalize of inode %i", k)
290 self._entries.clear()
293 def catch_exceptions(orig_func):
294 """Catch uncaught exceptions and log them consistently."""
296 @functools.wraps(orig_func)
297 def catch_exceptions_wrapper(self, *args, **kwargs):
299 return orig_func(self, *args, **kwargs)
300 except llfuse.FUSEError:
302 except EnvironmentError as e:
303 raise llfuse.FUSEError(e.errno)
304 except arvados.errors.KeepWriteError as e:
305 _logger.error("Keep write error: " + str(e))
306 raise llfuse.FUSEError(errno.EIO)
307 except arvados.errors.NotFoundError as e:
308 _logger.error("Block not found error: " + str(e))
309 raise llfuse.FUSEError(errno.EIO)
311 _logger.exception("Unhandled exception during FUSE operation")
312 raise llfuse.FUSEError(errno.EIO)
314 return catch_exceptions_wrapper
317 class Operations(llfuse.Operations):
318 """This is the main interface with llfuse.
320 The methods on this object are called by llfuse threads to service FUSE
321 events to query and read from the file system.
323 llfuse has its own global lock which is acquired before calling a request handler,
324 so request handlers do not run concurrently unless the lock is explicitly released
325 using 'with llfuse.lock_released:'
329 def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
330 super(Operations, self).__init__()
332 self._api_client = api_client
335 inode_cache = InodeCache(cap=256*1024*1024)
336 self.inodes = Inodes(inode_cache, encoding=encoding)
339 self.enable_write = enable_write
341 # dict of inode to filehandle
342 self._filehandles = {}
343 self._filehandles_counter = itertools.count(0)
345 # Other threads that need to wait until the fuse driver
346 # is fully initialized should wait() on this event object.
347 self.initlock = threading.Event()
349 # If we get overlapping shutdown events (e.g., fusermount -u
350 # -z and operations.destroy()) llfuse calls forget() on inodes
351 # that have already been deleted. To avoid this, we make
352 # forget() a no-op if called after destroy().
353 self._shutdown_started = threading.Event()
355 self.num_retries = num_retries
357 self.read_counter = arvados.keep.Counter()
358 self.write_counter = arvados.keep.Counter()
359 self.read_ops_counter = arvados.keep.Counter()
360 self.write_ops_counter = arvados.keep.Counter()
365 # Allow threads that are waiting for the driver to be finished
366 # initializing to continue
371 self._shutdown_started.set()
376 # Different versions of llfuse require and forbid us to
377 # acquire the lock here. See #8345#note-37, #10805#note-9.
378 if LLFUSE_VERSION_0 and llfuse.lock.acquire():
381 llfuse.lock.release()
386 def access(self, inode, mode, ctx):
389 def listen_for_events(self):
390 self.events = arvados.events.subscribe(
392 [["event_type", "in", ["create", "update", "delete"]]],
396 def on_event(self, ev):
397 if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
400 properties = ev.get("properties") or {}
401 old_attrs = properties.get("old_attributes") or {}
402 new_attrs = properties.get("new_attributes") or {}
404 for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
406 if ev.get("object_kind") == "arvados#collection":
407 pdh = new_attrs.get("portable_data_hash")
408 # new_attributes.modified_at currently lacks
409 # subsecond precision (see #6347) so use event_at
410 # which should always be the same.
411 stamp = ev.get("event_at")
412 if (stamp and pdh and item.writable() and
413 item.collection is not None and
414 item.collection.modified() and
415 new_attrs.get("is_trashed") is not True):
416 item.update(to_record_version=(stamp, pdh))
418 oldowner = old_attrs.get("owner_uuid")
419 newowner = ev.get("object_owner_uuid")
421 self.inodes.inode_cache.find_by_uuid(oldowner) +
422 self.inodes.inode_cache.find_by_uuid(newowner)):
423 parent.child_event(ev)
426 def getattr(self, inode, ctx=None):
427 if inode not in self.inodes:
428 raise llfuse.FUSEError(errno.ENOENT)
430 e = self.inodes[inode]
432 entry = llfuse.EntryAttributes()
435 entry.entry_timeout = 60 if e.allow_dirent_cache else 0
436 entry.attr_timeout = 60 if e.allow_attr_cache else 0
438 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
439 if isinstance(e, Directory):
440 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
442 entry.st_mode |= stat.S_IFREG
443 if isinstance(e, FuseArvadosFile):
444 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
446 if self.enable_write and e.writable():
447 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
450 entry.st_uid = self.uid
451 entry.st_gid = self.gid
454 entry.st_size = e.size()
456 entry.st_blksize = 512
457 entry.st_blocks = (entry.st_size/512)+1
458 if hasattr(entry, 'st_atime_ns'):
460 entry.st_atime_ns = int(e.atime() * 1000000000)
461 entry.st_mtime_ns = int(e.mtime() * 1000000000)
462 entry.st_ctime_ns = int(e.mtime() * 1000000000)
465 entry.st_atime = int(e.atime)
466 entry.st_mtime = int(e.mtime)
467 entry.st_ctime = int(e.mtime)
472 def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
473 entry = self.getattr(inode)
475 if fh is not None and fh in self._filehandles:
476 handle = self._filehandles[fh]
479 e = self.inodes[inode]
483 update_size = attr.st_size is not None
486 update_size = fields.update_size
487 if update_size and isinstance(e, FuseArvadosFile):
488 with llfuse.lock_released:
489 e.arvfile.truncate(attr.st_size)
490 entry.st_size = e.arvfile.size()
495 def lookup(self, parent_inode, name, ctx=None):
496 name = unicode(name, self.inodes.encoding)
502 if parent_inode in self.inodes:
503 p = self.inodes[parent_inode]
506 inode = p.parent_inode
507 elif isinstance(p, Directory) and name in p:
508 inode = p[name].inode
511 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
512 parent_inode, name, inode)
513 self.inodes[inode].inc_ref()
514 return self.getattr(inode)
516 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
518 raise llfuse.FUSEError(errno.ENOENT)
521 def forget(self, inodes):
522 if self._shutdown_started.is_set():
524 for inode, nlookup in inodes:
525 ent = self.inodes[inode]
526 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
527 if ent.dec_ref(nlookup) == 0 and ent.dead:
528 self.inodes.del_entry(ent)
531 def open(self, inode, flags, ctx=None):
532 if inode in self.inodes:
533 p = self.inodes[inode]
535 raise llfuse.FUSEError(errno.ENOENT)
537 if isinstance(p, Directory):
538 raise llfuse.FUSEError(errno.EISDIR)
540 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
541 raise llfuse.FUSEError(errno.EPERM)
543 fh = next(self._filehandles_counter)
544 self._filehandles[fh] = FileHandle(fh, p)
547 # Normally, we will have received an "update" event if the
548 # parent collection is stale here. However, even if the parent
549 # collection hasn't changed, the manifest might have been
550 # fetched so long ago that the signatures on the data block
551 # locators have expired. Calling checkupdate() on all
552 # ancestors ensures the signatures will be refreshed if
554 while p.parent_inode in self.inodes:
555 if p == self.inodes[p.parent_inode]:
557 p = self.inodes[p.parent_inode]
561 _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
566 def read(self, fh, off, size):
567 _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
568 self.read_ops_counter.add(1)
570 if fh in self._filehandles:
571 handle = self._filehandles[fh]
573 raise llfuse.FUSEError(errno.EBADF)
575 self.inodes.touch(handle.obj)
577 r = handle.obj.readfrom(off, size, self.num_retries)
579 self.read_counter.add(len(r))
583 def write(self, fh, off, buf):
584 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
585 self.write_ops_counter.add(1)
587 if fh in self._filehandles:
588 handle = self._filehandles[fh]
590 raise llfuse.FUSEError(errno.EBADF)
592 if not handle.obj.writable():
593 raise llfuse.FUSEError(errno.EPERM)
595 self.inodes.touch(handle.obj)
597 w = handle.obj.writeto(off, buf, self.num_retries)
599 self.write_counter.add(w)
603 def release(self, fh):
604 if fh in self._filehandles:
605 _logger.debug("arv-mount release fh %i", fh)
607 self._filehandles[fh].flush()
611 self._filehandles[fh].release()
612 del self._filehandles[fh]
613 self.inodes.inode_cache.cap_cache()
615 def releasedir(self, fh):
619 def opendir(self, inode, ctx=None):
620 _logger.debug("arv-mount opendir: inode %i", inode)
622 if inode in self.inodes:
623 p = self.inodes[inode]
625 raise llfuse.FUSEError(errno.ENOENT)
627 if not isinstance(p, Directory):
628 raise llfuse.FUSEError(errno.ENOTDIR)
630 fh = next(self._filehandles_counter)
631 if p.parent_inode in self.inodes:
632 parent = self.inodes[p.parent_inode]
634 raise llfuse.FUSEError(errno.EIO)
639 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
643 def readdir(self, fh, off):
644 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
646 if fh in self._filehandles:
647 handle = self._filehandles[fh]
649 raise llfuse.FUSEError(errno.EBADF)
652 while e < len(handle.entries):
653 if handle.entries[e][1].inode in self.inodes:
654 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
658 def statfs(self, ctx=None):
659 st = llfuse.StatvfsData()
660 st.f_bsize = 128 * 1024
673 def _check_writable(self, inode_parent):
674 if not self.enable_write:
675 raise llfuse.FUSEError(errno.EROFS)
677 if inode_parent in self.inodes:
678 p = self.inodes[inode_parent]
680 raise llfuse.FUSEError(errno.ENOENT)
682 if not isinstance(p, Directory):
683 raise llfuse.FUSEError(errno.ENOTDIR)
686 raise llfuse.FUSEError(errno.EPERM)
691 def create(self, inode_parent, name, mode, flags, ctx=None):
692 _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
694 p = self._check_writable(inode_parent)
697 # The file entry should have been implicitly created by callback.
699 fh = next(self._filehandles_counter)
700 self._filehandles[fh] = FileHandle(fh, f)
704 return (fh, self.getattr(f.inode))
707 def mkdir(self, inode_parent, name, mode, ctx=None):
708 _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
710 p = self._check_writable(inode_parent)
713 # The dir entry should have been implicitly created by callback.
717 return self.getattr(d.inode)
720 def unlink(self, inode_parent, name, ctx=None):
721 _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
722 p = self._check_writable(inode_parent)
726 def rmdir(self, inode_parent, name, ctx=None):
727 _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
728 p = self._check_writable(inode_parent)
732 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
733 _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
734 src = self._check_writable(inode_parent_old)
735 dest = self._check_writable(inode_parent_new)
736 dest.rename(name_old, name_new, src)
740 if fh in self._filehandles:
741 self._filehandles[fh].flush()
743 def fsync(self, fh, datasync):
746 def fsyncdir(self, fh, datasync):