1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 """FUSE driver for Arvados Keep
9 There is one `Operations` object per mount point. It is the entry point for all
10 read and write requests from the llfuse module.
12 The operations object owns an `Inodes` object. The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
16 The `Inodes` object owns an `InodeCache` object. The inode cache records the
17 memory footprint of file system objects and when they are last used. When the
18 cache limit is exceeded, the least recently used objects are cleared.
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
22 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
25 Directory objects inherit from `fusedir.Directory`. The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`. These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
32 The general FUSE operation flow is as follows:
34 - The request handler is called with either an inode or file handle that is the
35 subject of the operation.
37 - Look up the inode using the Inodes table or the file handle in the
38 filehandles table to get the file system object.
40 - For methods that alter files or directories, check that the operation is
41 valid and permitted using _check_writable().
43 - Call the relevant method on the file system object.
47 The FUSE driver supports the Arvados event bus. When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
77 # Default _notify_queue has a limit of 1000 items, but it really needs to be
78 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
81 if hasattr(llfuse, 'capi'):
83 llfuse.capi._notify_queue = Queue.Queue()
86 llfuse._notify_queue = Queue.Queue()
88 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
89 from fusefile import StringFile, FuseArvadosFile
91 _logger = logging.getLogger('arvados.arvados_fuse')
93 # Uncomment this to enable llfuse debug logging.
94 # log_handler = logging.StreamHandler()
95 # llogger = logging.getLogger('llfuse')
96 # llogger.addHandler(log_handler)
97 # llogger.setLevel(logging.DEBUG)
100 """Connects a numeric file handle to a File or Directory object that has
101 been opened by the client."""
103 def __init__(self, fh, obj):
115 class FileHandle(Handle):
116 """Connects a numeric file handle to a File object that has
117 been opened by the client."""
120 if self.obj.writable():
121 return self.obj.flush()
124 class DirectoryHandle(Handle):
125 """Connects a numeric file handle to a Directory object that has
126 been opened by the client."""
128 def __init__(self, fh, dirobj, entries):
129 super(DirectoryHandle, self).__init__(fh, dirobj)
130 self.entries = entries
133 class InodeCache(object):
134 """Records the memory footprint of objects and when they are last used.
136 When the cache limit is exceeded, the least recently used objects are
137 cleared. Clearing the object means discarding its contents to release
138 memory. The next time the object is accessed, it must be re-fetched from
139 the server. Note that the inode cache limit is a soft limit; the cache
140 limit may be exceeded if necessary to load very large objects, it may also
141 be exceeded if open file handles prevent objects from being cleared.
145 def __init__(self, cap, min_entries=4):
146 self._entries = collections.OrderedDict()
150 self.min_entries = min_entries
155 def _remove(self, obj, clear):
158 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
160 if obj.has_ref(True):
161 obj.kernel_invalidate()
162 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
166 # The llfuse lock is released in del_entry(), which is called by
167 # Directory.clear(). While the llfuse lock is released, it can happen
168 # that a reentrant call removes this entry before this call gets to it.
169 # Ensure that the entry is still valid before trying to remove it.
170 if obj.inode not in self._entries:
173 self._total -= obj.cache_size
174 del self._entries[obj.inode]
176 self._by_uuid[obj.cache_uuid].remove(obj)
177 if not self._by_uuid[obj.cache_uuid]:
178 del self._by_uuid[obj.cache_uuid]
179 obj.cache_uuid = None
181 _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
184 if self._total > self.cap:
185 for ent in self._entries.values():
186 if self._total < self.cap or len(self._entries) < self.min_entries:
188 self._remove(ent, True)
190 def manage(self, obj):
192 obj.cache_size = obj.objsize()
193 self._entries[obj.inode] = obj
194 obj.cache_uuid = obj.uuid()
196 if obj.cache_uuid not in self._by_uuid:
197 self._by_uuid[obj.cache_uuid] = [obj]
199 if obj not in self._by_uuid[obj.cache_uuid]:
200 self._by_uuid[obj.cache_uuid].append(obj)
201 self._total += obj.objsize()
202 _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
205 def touch(self, obj):
207 if obj.inode in self._entries:
208 self._remove(obj, False)
211 def unmanage(self, obj):
212 if obj.persisted() and obj.inode in self._entries:
213 self._remove(obj, True)
215 def find_by_uuid(self, uuid):
216 return self._by_uuid.get(uuid, [])
219 self._entries.clear()
220 self._by_uuid.clear()
223 class Inodes(object):
224 """Manage the set of inodes. This is the mapping from a numeric id
225 to a concrete File or Directory object"""
227 def __init__(self, inode_cache, encoding="utf-8"):
229 self._counter = itertools.count(llfuse.ROOT_INODE)
230 self.inode_cache = inode_cache
231 self.encoding = encoding
232 self.deferred_invalidations = []
234 def __getitem__(self, item):
235 return self._entries[item]
237 def __setitem__(self, key, item):
238 self._entries[key] = item
241 return self._entries.iterkeys()
244 return self._entries.items()
246 def __contains__(self, k):
247 return k in self._entries
249 def touch(self, entry):
250 entry._atime = time.time()
251 self.inode_cache.touch(entry)
253 def add_entry(self, entry):
254 entry.inode = next(self._counter)
255 if entry.inode == llfuse.ROOT_INODE:
257 self._entries[entry.inode] = entry
258 self.inode_cache.manage(entry)
261 def del_entry(self, entry):
262 if entry.ref_count == 0:
263 self.inode_cache.unmanage(entry)
264 del self._entries[entry.inode]
265 with llfuse.lock_released:
267 self.invalidate_inode(entry.inode)
271 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
273 def invalidate_inode(self, inode):
274 llfuse.invalidate_inode(inode)
276 def invalidate_entry(self, inode, name):
277 llfuse.invalidate_entry(inode, name.encode(self.encoding))
280 self.inode_cache.clear()
282 for k,v in self._entries.items():
285 except Exception as e:
286 _logger.exception("Error during finalize of inode %i", k)
288 self._entries.clear()
291 def catch_exceptions(orig_func):
292 """Catch uncaught exceptions and log them consistently."""
294 @functools.wraps(orig_func)
295 def catch_exceptions_wrapper(self, *args, **kwargs):
297 return orig_func(self, *args, **kwargs)
298 except llfuse.FUSEError:
300 except EnvironmentError as e:
301 raise llfuse.FUSEError(e.errno)
302 except arvados.errors.KeepWriteError as e:
303 _logger.error("Keep write error: " + str(e))
304 raise llfuse.FUSEError(errno.EIO)
305 except arvados.errors.NotFoundError as e:
306 _logger.error("Block not found error: " + str(e))
307 raise llfuse.FUSEError(errno.EIO)
309 _logger.exception("Unhandled exception during FUSE operation")
310 raise llfuse.FUSEError(errno.EIO)
312 return catch_exceptions_wrapper
315 class Operations(llfuse.Operations):
316 """This is the main interface with llfuse.
318 The methods on this object are called by llfuse threads to service FUSE
319 events to query and read from the file system.
321 llfuse has its own global lock which is acquired before calling a request handler,
322 so request handlers do not run concurrently unless the lock is explicitly released
323 using 'with llfuse.lock_released:'
327 def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
328 super(Operations, self).__init__()
330 self._api_client = api_client
333 inode_cache = InodeCache(cap=256*1024*1024)
334 self.inodes = Inodes(inode_cache, encoding=encoding)
337 self.enable_write = enable_write
339 # dict of inode to filehandle
340 self._filehandles = {}
341 self._filehandles_counter = itertools.count(0)
343 # Other threads that need to wait until the fuse driver
344 # is fully initialized should wait() on this event object.
345 self.initlock = threading.Event()
347 # If we get overlapping shutdown events (e.g., fusermount -u
348 # -z and operations.destroy()) llfuse calls forget() on inodes
349 # that have already been deleted. To avoid this, we make
350 # forget() a no-op if called after destroy().
351 self._shutdown_started = threading.Event()
353 self.num_retries = num_retries
355 self.read_counter = arvados.keep.Counter()
356 self.write_counter = arvados.keep.Counter()
357 self.read_ops_counter = arvados.keep.Counter()
358 self.write_ops_counter = arvados.keep.Counter()
363 # Allow threads that are waiting for the driver to be finished
364 # initializing to continue
369 self._shutdown_started.set()
374 if llfuse.lock.acquire():
377 llfuse.lock.release()
382 def access(self, inode, mode, ctx):
385 def listen_for_events(self):
386 self.events = arvados.events.subscribe(
388 [["event_type", "in", ["create", "update", "delete"]]],
392 def on_event(self, ev):
393 if 'event_type' not in ev:
396 new_attrs = (ev.get("properties") or {}).get("new_attributes") or {}
397 pdh = new_attrs.get("portable_data_hash")
398 # new_attributes.modified_at currently lacks
399 # subsecond precision (see #6347) so use event_at
400 # which should always be the same.
401 stamp = ev.get("event_at")
403 for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
405 if stamp and pdh and ev.get("object_kind") == "arvados#collection":
406 item.update(to_record_version=(stamp, pdh))
410 oldowner = ((ev.get("properties") or {}).get("old_attributes") or {}).get("owner_uuid")
411 newowner = ev.get("object_owner_uuid")
413 self.inodes.inode_cache.find_by_uuid(oldowner) +
414 self.inodes.inode_cache.find_by_uuid(newowner)):
419 def getattr(self, inode, ctx=None):
420 if inode not in self.inodes:
421 raise llfuse.FUSEError(errno.ENOENT)
423 e = self.inodes[inode]
425 entry = llfuse.EntryAttributes()
428 entry.entry_timeout = 60 if e.allow_dirent_cache else 0
429 entry.attr_timeout = 60 if e.allow_attr_cache else 0
431 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
432 if isinstance(e, Directory):
433 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
435 entry.st_mode |= stat.S_IFREG
436 if isinstance(e, FuseArvadosFile):
437 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
439 if self.enable_write and e.writable():
440 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
443 entry.st_uid = self.uid
444 entry.st_gid = self.gid
447 entry.st_size = e.size()
449 entry.st_blksize = 512
450 entry.st_blocks = (entry.st_size/512)+1
451 if hasattr(entry, 'st_atime_ns'):
453 entry.st_atime_ns = int(e.atime() * 1000000000)
454 entry.st_mtime_ns = int(e.mtime() * 1000000000)
455 entry.st_ctime_ns = int(e.mtime() * 1000000000)
458 entry.st_atime = int(e.atime)
459 entry.st_mtime = int(e.mtime)
460 entry.st_ctime = int(e.mtime)
465 def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
466 entry = self.getattr(inode)
468 if fh is not None and fh in self._filehandles:
469 handle = self._filehandles[fh]
472 e = self.inodes[inode]
476 update_size = attr.st_size is not None
479 update_size = fields.update_size
480 if update_size and isinstance(e, FuseArvadosFile):
481 with llfuse.lock_released:
482 e.arvfile.truncate(attr.st_size)
483 entry.st_size = e.arvfile.size()
488 def lookup(self, parent_inode, name, ctx=None):
489 name = unicode(name, self.inodes.encoding)
495 if parent_inode in self.inodes:
496 p = self.inodes[parent_inode]
499 inode = p.parent_inode
500 elif isinstance(p, Directory) and name in p:
501 inode = p[name].inode
504 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
505 parent_inode, name, inode)
506 self.inodes[inode].inc_ref()
507 return self.getattr(inode)
509 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
511 raise llfuse.FUSEError(errno.ENOENT)
514 def forget(self, inodes):
515 if self._shutdown_started.is_set():
517 for inode, nlookup in inodes:
518 ent = self.inodes[inode]
519 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
520 if ent.dec_ref(nlookup) == 0 and ent.dead:
521 self.inodes.del_entry(ent)
524 def open(self, inode, flags, ctx=None):
525 if inode in self.inodes:
526 p = self.inodes[inode]
528 raise llfuse.FUSEError(errno.ENOENT)
530 if isinstance(p, Directory):
531 raise llfuse.FUSEError(errno.EISDIR)
533 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
534 raise llfuse.FUSEError(errno.EPERM)
536 fh = next(self._filehandles_counter)
537 self._filehandles[fh] = FileHandle(fh, p)
540 # Normally, we will have received an "update" event if the
541 # parent collection is stale here. However, even if the parent
542 # collection hasn't changed, the manifest might have been
543 # fetched so long ago that the signatures on the data block
544 # locators have expired. Calling checkupdate() on all
545 # ancestors ensures the signatures will be refreshed if
547 while p.parent_inode in self.inodes:
548 if p == self.inodes[p.parent_inode]:
550 p = self.inodes[p.parent_inode]
554 _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
559 def read(self, fh, off, size):
560 _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
561 self.read_ops_counter.add(1)
563 if fh in self._filehandles:
564 handle = self._filehandles[fh]
566 raise llfuse.FUSEError(errno.EBADF)
568 self.inodes.touch(handle.obj)
570 r = handle.obj.readfrom(off, size, self.num_retries)
572 self.read_counter.add(len(r))
576 def write(self, fh, off, buf):
577 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
578 self.write_ops_counter.add(1)
580 if fh in self._filehandles:
581 handle = self._filehandles[fh]
583 raise llfuse.FUSEError(errno.EBADF)
585 if not handle.obj.writable():
586 raise llfuse.FUSEError(errno.EPERM)
588 self.inodes.touch(handle.obj)
590 w = handle.obj.writeto(off, buf, self.num_retries)
592 self.write_counter.add(w)
596 def release(self, fh):
597 if fh in self._filehandles:
598 _logger.debug("arv-mount release fh %i", fh)
600 self._filehandles[fh].flush()
604 self._filehandles[fh].release()
605 del self._filehandles[fh]
606 self.inodes.inode_cache.cap_cache()
608 def releasedir(self, fh):
612 def opendir(self, inode, ctx=None):
613 _logger.debug("arv-mount opendir: inode %i", inode)
615 if inode in self.inodes:
616 p = self.inodes[inode]
618 raise llfuse.FUSEError(errno.ENOENT)
620 if not isinstance(p, Directory):
621 raise llfuse.FUSEError(errno.ENOTDIR)
623 fh = next(self._filehandles_counter)
624 if p.parent_inode in self.inodes:
625 parent = self.inodes[p.parent_inode]
627 raise llfuse.FUSEError(errno.EIO)
632 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
636 def readdir(self, fh, off):
637 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
639 if fh in self._filehandles:
640 handle = self._filehandles[fh]
642 raise llfuse.FUSEError(errno.EBADF)
645 while e < len(handle.entries):
646 if handle.entries[e][1].inode in self.inodes:
647 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
651 def statfs(self, ctx=None):
652 st = llfuse.StatvfsData()
653 st.f_bsize = 128 * 1024
666 def _check_writable(self, inode_parent):
667 if not self.enable_write:
668 raise llfuse.FUSEError(errno.EROFS)
670 if inode_parent in self.inodes:
671 p = self.inodes[inode_parent]
673 raise llfuse.FUSEError(errno.ENOENT)
675 if not isinstance(p, Directory):
676 raise llfuse.FUSEError(errno.ENOTDIR)
679 raise llfuse.FUSEError(errno.EPERM)
684 def create(self, inode_parent, name, mode, flags, ctx=None):
685 _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
687 p = self._check_writable(inode_parent)
690 # The file entry should have been implicitly created by callback.
692 fh = next(self._filehandles_counter)
693 self._filehandles[fh] = FileHandle(fh, f)
697 return (fh, self.getattr(f.inode))
700 def mkdir(self, inode_parent, name, mode, ctx=None):
701 _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
703 p = self._check_writable(inode_parent)
706 # The dir entry should have been implicitly created by callback.
710 return self.getattr(d.inode)
713 def unlink(self, inode_parent, name, ctx=None):
714 _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
715 p = self._check_writable(inode_parent)
719 def rmdir(self, inode_parent, name, ctx=None):
720 _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
721 p = self._check_writable(inode_parent)
725 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
726 _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
727 src = self._check_writable(inode_parent_old)
728 dest = self._check_writable(inode_parent_new)
729 dest.rename(name_old, name_new, src)
733 if fh in self._filehandles:
734 self._filehandles[fh].flush()
736 def fsync(self, fh, datasync):
739 def fsyncdir(self, fh, datasync):