1 """FUSE driver for Arvados Keep
5 There is one `Operations` object per mount point. It is the entry point for all
6 read and write requests from the llfuse module.
8 The operations object owns an `Inodes` object. The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
12 The `Inodes` object owns an `InodeCache` object. The inode cache records the
13 memory footprint of file system objects and when they are last used. When the
14 cache limit is exceeded, the least recently used objects are cleared.
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
18 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
21 Directory objects inherit from `fusedir.Directory`. The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`. These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
28 The general FUSE operation flow is as follows:
30 - The request handler is called with either an inode or file handle that is the
31 subject of the operation.
33 - Look up the inode using the Inodes table or the file handle in the
34 filehandles table to get the file system object.
36 - For methods that alter files or directories, check that the operation is
37 valid and permitted using _check_writable().
39 - Call the relevant method on the file system object.
43 The FUSE driver supports the Arvados event bus. When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
73 # Default _notify_queue has a limit of 1000 items, but it really needs to be
74 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
77 if hasattr(llfuse, 'capi'):
79 llfuse.capi._notify_queue = Queue.Queue()
82 llfuse._notify_queue = Queue.Queue()
84 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
85 from fusefile import StringFile, FuseArvadosFile
87 _logger = logging.getLogger('arvados.arvados_fuse')
89 # Uncomment this to enable llfuse debug logging.
90 # log_handler = logging.StreamHandler()
91 # llogger = logging.getLogger('llfuse')
92 # llogger.addHandler(log_handler)
93 # llogger.setLevel(logging.DEBUG)
96 """Connects a numeric file handle to a File or Directory object that has
97 been opened by the client."""
99 def __init__(self, fh, obj):
108 if self.obj.writable():
109 return self.obj.flush()
112 class FileHandle(Handle):
113 """Connects a numeric file handle to a File object that has
114 been opened by the client."""
118 class DirectoryHandle(Handle):
119 """Connects a numeric file handle to a Directory object that has
120 been opened by the client."""
122 def __init__(self, fh, dirobj, entries):
123 super(DirectoryHandle, self).__init__(fh, dirobj)
124 self.entries = entries
127 class InodeCache(object):
128 """Records the memory footprint of objects and when they are last used.
130 When the cache limit is exceeded, the least recently used objects are
131 cleared. Clearing the object means discarding its contents to release
132 memory. The next time the object is accessed, it must be re-fetched from
133 the server. Note that the inode cache limit is a soft limit; the cache
134 limit may be exceeded if necessary to load very large objects, it may also
135 be exceeded if open file handles prevent objects from being cleared.
139 def __init__(self, cap, min_entries=4):
140 self._entries = collections.OrderedDict()
144 self.min_entries = min_entries
149 def _remove(self, obj, clear):
152 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
154 if obj.has_ref(True):
155 obj.kernel_invalidate()
156 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
160 # The llfuse lock is released in del_entry(), which is called by
161 # Directory.clear(). While the llfuse lock is released, it can happen
162 # that a reentrant call removes this entry before this call gets to it.
163 # Ensure that the entry is still valid before trying to remove it.
164 if obj.inode not in self._entries:
167 self._total -= obj.cache_size
168 del self._entries[obj.inode]
170 self._by_uuid[obj.cache_uuid].remove(obj)
171 if not self._by_uuid[obj.cache_uuid]:
172 del self._by_uuid[obj.cache_uuid]
173 obj.cache_uuid = None
175 _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
178 if self._total > self.cap:
179 for ent in self._entries.values():
180 if self._total < self.cap or len(self._entries) < self.min_entries:
182 self._remove(ent, True)
184 def manage(self, obj):
186 obj.cache_size = obj.objsize()
187 self._entries[obj.inode] = obj
188 obj.cache_uuid = obj.uuid()
190 if obj.cache_uuid not in self._by_uuid:
191 self._by_uuid[obj.cache_uuid] = [obj]
193 if obj not in self._by_uuid[obj.cache_uuid]:
194 self._by_uuid[obj.cache_uuid].append(obj)
195 self._total += obj.objsize()
196 _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
199 def touch(self, obj):
201 if obj.inode in self._entries:
202 self._remove(obj, False)
205 def unmanage(self, obj):
206 if obj.persisted() and obj.inode in self._entries:
207 self._remove(obj, True)
209 def find_by_uuid(self, uuid):
210 return self._by_uuid.get(uuid, [])
213 self._entries.clear()
214 self._by_uuid.clear()
217 class Inodes(object):
218 """Manage the set of inodes. This is the mapping from a numeric id
219 to a concrete File or Directory object"""
221 def __init__(self, inode_cache, encoding="utf-8"):
223 self._counter = itertools.count(llfuse.ROOT_INODE)
224 self.inode_cache = inode_cache
225 self.encoding = encoding
226 self.deferred_invalidations = []
228 def __getitem__(self, item):
229 return self._entries[item]
231 def __setitem__(self, key, item):
232 self._entries[key] = item
235 return self._entries.iterkeys()
238 return self._entries.items()
240 def __contains__(self, k):
241 return k in self._entries
243 def touch(self, entry):
244 entry._atime = time.time()
245 self.inode_cache.touch(entry)
247 def add_entry(self, entry):
248 entry.inode = next(self._counter)
249 if entry.inode == llfuse.ROOT_INODE:
251 self._entries[entry.inode] = entry
252 self.inode_cache.manage(entry)
255 def del_entry(self, entry):
256 if entry.ref_count == 0:
257 self.inode_cache.unmanage(entry)
258 del self._entries[entry.inode]
259 with llfuse.lock_released:
261 self.invalidate_inode(entry.inode)
265 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
267 def invalidate_inode(self, inode):
268 llfuse.invalidate_inode(inode)
270 def invalidate_entry(self, inode, name):
271 llfuse.invalidate_entry(inode, name.encode(self.encoding))
274 self.inode_cache.clear()
276 for k,v in self._entries.items():
279 except Exception as e:
280 _logger.exception("Error during finalize of inode %i", k)
282 self._entries.clear()
285 def catch_exceptions(orig_func):
286 """Catch uncaught exceptions and log them consistently."""
288 @functools.wraps(orig_func)
289 def catch_exceptions_wrapper(self, *args, **kwargs):
291 return orig_func(self, *args, **kwargs)
292 except llfuse.FUSEError:
294 except EnvironmentError as e:
295 raise llfuse.FUSEError(e.errno)
296 except arvados.errors.KeepWriteError as e:
297 _logger.error("Keep write error: " + str(e))
298 raise llfuse.FUSEError(errno.EIO)
299 except arvados.errors.NotFoundError as e:
300 _logger.error("Block not found error: " + str(e))
301 raise llfuse.FUSEError(errno.EIO)
303 _logger.exception("Unhandled exception during FUSE operation")
304 raise llfuse.FUSEError(errno.EIO)
306 return catch_exceptions_wrapper
309 class Operations(llfuse.Operations):
310 """This is the main interface with llfuse.
312 The methods on this object are called by llfuse threads to service FUSE
313 events to query and read from the file system.
315 llfuse has its own global lock which is acquired before calling a request handler,
316 so request handlers do not run concurrently unless the lock is explicitly released
317 using 'with llfuse.lock_released:'
321 def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
322 super(Operations, self).__init__()
324 self._api_client = api_client
327 inode_cache = InodeCache(cap=256*1024*1024)
328 self.inodes = Inodes(inode_cache, encoding=encoding)
331 self.enable_write = enable_write
333 # dict of inode to filehandle
334 self._filehandles = {}
335 self._filehandles_counter = itertools.count(0)
337 # Other threads that need to wait until the fuse driver
338 # is fully initialized should wait() on this event object.
339 self.initlock = threading.Event()
341 # If we get overlapping shutdown events (e.g., fusermount -u
342 # -z and operations.destroy()) llfuse calls forget() on inodes
343 # that have already been deleted. To avoid this, we make
344 # forget() a no-op if called after destroy().
345 self._shutdown_started = threading.Event()
347 self.num_retries = num_retries
349 self.read_counter = arvados.keep.Counter()
350 self.write_counter = arvados.keep.Counter()
351 self.read_ops_counter = arvados.keep.Counter()
352 self.write_ops_counter = arvados.keep.Counter()
357 # Allow threads that are waiting for the driver to be finished
358 # initializing to continue
363 self._shutdown_started.set()
368 if llfuse.lock.acquire():
371 llfuse.lock.release()
376 def access(self, inode, mode, ctx):
379 def listen_for_events(self):
380 self.events = arvados.events.subscribe(
382 [["event_type", "in", ["create", "update", "delete"]]],
386 def on_event(self, ev):
387 if 'event_type' not in ev:
390 new_attrs = (ev.get("properties") or {}).get("new_attributes") or {}
391 pdh = new_attrs.get("portable_data_hash")
392 # new_attributes.modified_at currently lacks
393 # subsecond precision (see #6347) so use event_at
394 # which should always be the same.
395 stamp = ev.get("event_at")
397 for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
399 if stamp and pdh and ev.get("object_kind") == "arvados#collection":
400 item.update(to_record_version=(stamp, pdh))
404 oldowner = ((ev.get("properties") or {}).get("old_attributes") or {}).get("owner_uuid")
405 newowner = ev.get("object_owner_uuid")
407 self.inodes.inode_cache.find_by_uuid(oldowner) +
408 self.inodes.inode_cache.find_by_uuid(newowner)):
413 def getattr(self, inode, ctx=None):
414 if inode not in self.inodes:
415 raise llfuse.FUSEError(errno.ENOENT)
417 e = self.inodes[inode]
419 entry = llfuse.EntryAttributes()
422 entry.entry_timeout = 60 if e.allow_dirent_cache else 0
423 entry.attr_timeout = 60 if e.allow_attr_cache else 0
425 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
426 if isinstance(e, Directory):
427 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
429 entry.st_mode |= stat.S_IFREG
430 if isinstance(e, FuseArvadosFile):
431 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
433 if self.enable_write and e.writable():
434 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
437 entry.st_uid = self.uid
438 entry.st_gid = self.gid
441 entry.st_size = e.size()
443 entry.st_blksize = 512
444 entry.st_blocks = (entry.st_size/512)+1
445 if hasattr(entry, 'st_atime_ns'):
447 entry.st_atime_ns = int(e.atime() * 1000000000)
448 entry.st_mtime_ns = int(e.mtime() * 1000000000)
449 entry.st_ctime_ns = int(e.mtime() * 1000000000)
452 entry.st_atime = int(e.atime)
453 entry.st_mtime = int(e.mtime)
454 entry.st_ctime = int(e.mtime)
459 def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
460 entry = self.getattr(inode)
462 if fh is not None and fh in self._filehandles:
463 handle = self._filehandles[fh]
466 e = self.inodes[inode]
470 update_size = attr.st_size is not None
473 update_size = fields.update_size
474 if update_size and isinstance(e, FuseArvadosFile):
475 with llfuse.lock_released:
476 e.arvfile.truncate(attr.st_size)
477 entry.st_size = e.arvfile.size()
482 def lookup(self, parent_inode, name, ctx=None):
483 name = unicode(name, self.inodes.encoding)
489 if parent_inode in self.inodes:
490 p = self.inodes[parent_inode]
493 inode = p.parent_inode
494 elif isinstance(p, Directory) and name in p:
495 inode = p[name].inode
498 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
499 parent_inode, name, inode)
500 self.inodes[inode].inc_ref()
501 return self.getattr(inode)
503 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
505 raise llfuse.FUSEError(errno.ENOENT)
508 def forget(self, inodes):
509 if self._shutdown_started.is_set():
511 for inode, nlookup in inodes:
512 ent = self.inodes[inode]
513 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
514 if ent.dec_ref(nlookup) == 0 and ent.dead:
515 self.inodes.del_entry(ent)
518 def open(self, inode, flags, ctx=None):
519 if inode in self.inodes:
520 p = self.inodes[inode]
522 raise llfuse.FUSEError(errno.ENOENT)
524 if isinstance(p, Directory):
525 raise llfuse.FUSEError(errno.EISDIR)
527 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
528 raise llfuse.FUSEError(errno.EPERM)
530 fh = next(self._filehandles_counter)
531 self._filehandles[fh] = FileHandle(fh, p)
534 # Normally, we will have received an "update" event if the
535 # parent collection is stale here. However, even if the parent
536 # collection hasn't changed, the manifest might have been
537 # fetched so long ago that the signatures on the data block
538 # locators have expired. Calling checkupdate() on all
539 # ancestors ensures the signatures will be refreshed if
541 while p.parent_inode in self.inodes:
542 if p == self.inodes[p.parent_inode]:
544 p = self.inodes[p.parent_inode]
548 _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
553 def read(self, fh, off, size):
554 _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
555 self.read_ops_counter.add(1)
557 if fh in self._filehandles:
558 handle = self._filehandles[fh]
560 raise llfuse.FUSEError(errno.EBADF)
562 self.inodes.touch(handle.obj)
564 r = handle.obj.readfrom(off, size, self.num_retries)
566 self.read_counter.add(len(r))
570 def write(self, fh, off, buf):
571 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
572 self.write_ops_counter.add(1)
574 if fh in self._filehandles:
575 handle = self._filehandles[fh]
577 raise llfuse.FUSEError(errno.EBADF)
579 if not handle.obj.writable():
580 raise llfuse.FUSEError(errno.EPERM)
582 self.inodes.touch(handle.obj)
584 w = handle.obj.writeto(off, buf, self.num_retries)
586 self.write_counter.add(w)
590 def release(self, fh):
591 if fh in self._filehandles:
593 self._filehandles[fh].flush()
597 self._filehandles[fh].release()
598 del self._filehandles[fh]
599 self.inodes.inode_cache.cap_cache()
601 def releasedir(self, fh):
605 def opendir(self, inode, ctx=None):
606 _logger.debug("arv-mount opendir: inode %i", inode)
608 if inode in self.inodes:
609 p = self.inodes[inode]
611 raise llfuse.FUSEError(errno.ENOENT)
613 if not isinstance(p, Directory):
614 raise llfuse.FUSEError(errno.ENOTDIR)
616 fh = next(self._filehandles_counter)
617 if p.parent_inode in self.inodes:
618 parent = self.inodes[p.parent_inode]
620 raise llfuse.FUSEError(errno.EIO)
625 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
629 def readdir(self, fh, off):
630 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
632 if fh in self._filehandles:
633 handle = self._filehandles[fh]
635 raise llfuse.FUSEError(errno.EBADF)
638 while e < len(handle.entries):
639 if handle.entries[e][1].inode in self.inodes:
640 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
644 def statfs(self, ctx=None):
645 st = llfuse.StatvfsData()
646 st.f_bsize = 128 * 1024
659 def _check_writable(self, inode_parent):
660 if not self.enable_write:
661 raise llfuse.FUSEError(errno.EROFS)
663 if inode_parent in self.inodes:
664 p = self.inodes[inode_parent]
666 raise llfuse.FUSEError(errno.ENOENT)
668 if not isinstance(p, Directory):
669 raise llfuse.FUSEError(errno.ENOTDIR)
672 raise llfuse.FUSEError(errno.EPERM)
677 def create(self, inode_parent, name, mode, flags, ctx=None):
678 _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
680 p = self._check_writable(inode_parent)
683 # The file entry should have been implicitly created by callback.
685 fh = next(self._filehandles_counter)
686 self._filehandles[fh] = FileHandle(fh, f)
690 return (fh, self.getattr(f.inode))
693 def mkdir(self, inode_parent, name, mode, ctx=None):
694 _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
696 p = self._check_writable(inode_parent)
699 # The dir entry should have been implicitly created by callback.
703 return self.getattr(d.inode)
706 def unlink(self, inode_parent, name, ctx=None):
707 _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
708 p = self._check_writable(inode_parent)
712 def rmdir(self, inode_parent, name, ctx=None):
713 _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
714 p = self._check_writable(inode_parent)
718 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
719 _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
720 src = self._check_writable(inode_parent_old)
721 dest = self._check_writable(inode_parent_new)
722 dest.rename(name_old, name_new, src)
726 if fh in self._filehandles:
727 self._filehandles[fh].flush()
729 def fsync(self, fh, datasync):
732 def fsyncdir(self, fh, datasync):