1 """FUSE driver for Arvados Keep
5 There is one `Operations` object per mount point. It is the entry point for all
6 read and write requests from the llfuse module.
8 The operations object owns an `Inodes` object. The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
12 The `Inodes` object owns an `InodeCache` object. The inode cache records the
13 memory footprint of file system objects and when they are last used. When the
14 cache limit is exceeded, the least recently used objects are cleared.
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
18 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
21 Directory objects inherit from `fusedir.Directory`. The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`. These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
28 The general FUSE operation flow is as follows:
30 - The request handler is called with either an inode or file handle that is the
31 subject of the operation.
33 - Look up the inode using the Inodes table or the file handle in the
34 filehandles table to get the file system object.
36 - For methods that alter files or directories, check that the operation is
37 valid and permitted using _check_writable().
39 - Call the relevant method on the file system object.
43 The FUSE driver supports the Arvados event bus. When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
73 # Default _notify_queue has a limit of 1000 items, but it really needs to be
74 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
77 llfuse._notify_queue = Queue.Queue()
79 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
80 from fusefile import StringFile, FuseArvadosFile
82 _logger = logging.getLogger('arvados.arvados_fuse')
84 # Uncomment this to enable llfuse debug logging.
85 # log_handler = logging.StreamHandler()
86 # llogger = logging.getLogger('llfuse')
87 # llogger.addHandler(log_handler)
88 # llogger.setLevel(logging.DEBUG)
91 """Connects a numeric file handle to a File or Directory object that has
92 been opened by the client."""
94 def __init__(self, fh, obj):
103 if self.obj.writable():
104 return self.obj.flush()
107 class FileHandle(Handle):
108 """Connects a numeric file handle to a File object that has
109 been opened by the client."""
113 class DirectoryHandle(Handle):
114 """Connects a numeric file handle to a Directory object that has
115 been opened by the client."""
117 def __init__(self, fh, dirobj, entries):
118 super(DirectoryHandle, self).__init__(fh, dirobj)
119 self.entries = entries
122 class InodeCache(object):
123 """Records the memory footprint of objects and when they are last used.
125 When the cache limit is exceeded, the least recently used objects are
126 cleared. Clearing the object means discarding its contents to release
127 memory. The next time the object is accessed, it must be re-fetched from
128 the server. Note that the inode cache limit is a soft limit; the cache
129 limit may be exceeded if necessary to load very large objects, it may also
130 be exceeded if open file handles prevent objects from being cleared.
134 def __init__(self, cap, min_entries=4):
135 self._entries = collections.OrderedDict()
139 self.min_entries = min_entries
144 def _remove(self, obj, clear):
147 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
149 if obj.has_ref(True):
150 obj.kernel_invalidate()
151 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
155 # The llfuse lock is released in del_entry(), which is called by
156 # Directory.clear(). While the llfuse lock is released, it can happen
157 # that a reentrant call removes this entry before this call gets to it.
158 # Ensure that the entry is still valid before trying to remove it.
159 if obj.inode not in self._entries:
162 self._total -= obj.cache_size
163 del self._entries[obj.inode]
165 self._by_uuid[obj.cache_uuid].remove(obj)
166 if not self._by_uuid[obj.cache_uuid]:
167 del self._by_uuid[obj.cache_uuid]
168 obj.cache_uuid = None
170 _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
173 if self._total > self.cap:
174 for ent in self._entries.values():
175 if self._total < self.cap or len(self._entries) < self.min_entries:
177 self._remove(ent, True)
179 def manage(self, obj):
181 obj.cache_size = obj.objsize()
182 self._entries[obj.inode] = obj
183 obj.cache_uuid = obj.uuid()
185 if obj.cache_uuid not in self._by_uuid:
186 self._by_uuid[obj.cache_uuid] = [obj]
188 if obj not in self._by_uuid[obj.cache_uuid]:
189 self._by_uuid[obj.cache_uuid].append(obj)
190 self._total += obj.objsize()
191 _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
194 def touch(self, obj):
196 if obj.inode in self._entries:
197 self._remove(obj, False)
200 def unmanage(self, obj):
201 if obj.persisted() and obj.inode in self._entries:
202 self._remove(obj, True)
204 def find_by_uuid(self, uuid):
205 return self._by_uuid.get(uuid, [])
208 self._entries.clear()
209 self._by_uuid.clear()
212 class Inodes(object):
213 """Manage the set of inodes. This is the mapping from a numeric id
214 to a concrete File or Directory object"""
216 def __init__(self, inode_cache, encoding="utf-8"):
218 self._counter = itertools.count(llfuse.ROOT_INODE)
219 self.inode_cache = inode_cache
220 self.encoding = encoding
221 self.deferred_invalidations = []
223 def __getitem__(self, item):
224 return self._entries[item]
226 def __setitem__(self, key, item):
227 self._entries[key] = item
230 return self._entries.iterkeys()
233 return self._entries.items()
235 def __contains__(self, k):
236 return k in self._entries
238 def touch(self, entry):
239 entry._atime = time.time()
240 self.inode_cache.touch(entry)
242 def add_entry(self, entry):
243 entry.inode = next(self._counter)
244 if entry.inode == llfuse.ROOT_INODE:
246 self._entries[entry.inode] = entry
247 self.inode_cache.manage(entry)
250 def del_entry(self, entry):
251 if entry.ref_count == 0:
252 self.inode_cache.unmanage(entry)
253 del self._entries[entry.inode]
254 with llfuse.lock_released:
256 self.invalidate_inode(entry.inode)
260 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
262 def invalidate_inode(self, inode):
263 llfuse.invalidate_inode(inode)
265 def invalidate_entry(self, inode, name):
266 llfuse.invalidate_entry(inode, name.encode(self.encoding))
269 self.inode_cache.clear()
271 for k,v in self._entries.items():
274 except Exception as e:
275 _logger.exception("Error during finalize of inode %i", k)
277 self._entries.clear()
280 def catch_exceptions(orig_func):
281 """Catch uncaught exceptions and log them consistently."""
283 @functools.wraps(orig_func)
284 def catch_exceptions_wrapper(self, *args, **kwargs):
286 return orig_func(self, *args, **kwargs)
287 except llfuse.FUSEError:
289 except EnvironmentError as e:
290 raise llfuse.FUSEError(e.errno)
291 except arvados.errors.KeepWriteError as e:
292 _logger.error("Keep write error: " + str(e))
293 raise llfuse.FUSEError(errno.EIO)
294 except arvados.errors.NotFoundError as e:
295 _logger.error("Block not found error: " + str(e))
296 raise llfuse.FUSEError(errno.EIO)
298 _logger.exception("Unhandled exception during FUSE operation")
299 raise llfuse.FUSEError(errno.EIO)
301 return catch_exceptions_wrapper
304 class Operations(llfuse.Operations):
305 """This is the main interface with llfuse.
307 The methods on this object are called by llfuse threads to service FUSE
308 events to query and read from the file system.
310 llfuse has its own global lock which is acquired before calling a request handler,
311 so request handlers do not run concurrently unless the lock is explicitly released
312 using 'with llfuse.lock_released:'
316 def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
317 super(Operations, self).__init__()
319 self._api_client = api_client
322 inode_cache = InodeCache(cap=256*1024*1024)
323 self.inodes = Inodes(inode_cache, encoding=encoding)
326 self.enable_write = enable_write
328 # dict of inode to filehandle
329 self._filehandles = {}
330 self._filehandles_counter = itertools.count(0)
332 # Other threads that need to wait until the fuse driver
333 # is fully initialized should wait() on this event object.
334 self.initlock = threading.Event()
336 # If we get overlapping shutdown events (e.g., fusermount -u
337 # -z and operations.destroy()) llfuse calls forget() on inodes
338 # that have already been deleted. To avoid this, we make
339 # forget() a no-op if called after destroy().
340 self._shutdown_started = threading.Event()
342 self.num_retries = num_retries
344 self.read_counter = arvados.keep.Counter()
345 self.write_counter = arvados.keep.Counter()
346 self.read_ops_counter = arvados.keep.Counter()
347 self.write_ops_counter = arvados.keep.Counter()
352 # Allow threads that are waiting for the driver to be finished
353 # initializing to continue
358 self._shutdown_started.set()
365 def access(self, inode, mode, ctx):
368 def listen_for_events(self):
369 self.events = arvados.events.subscribe(
371 [["event_type", "in", ["create", "update", "delete"]]],
375 def on_event(self, ev):
376 if 'event_type' not in ev:
379 new_attrs = (ev.get("properties") or {}).get("new_attributes") or {}
380 pdh = new_attrs.get("portable_data_hash")
381 # new_attributes.modified_at currently lacks
382 # subsecond precision (see #6347) so use event_at
383 # which should always be the same.
384 stamp = ev.get("event_at")
386 for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
388 if stamp and pdh and ev.get("object_kind") == "arvados#collection":
389 item.update(to_record_version=(stamp, pdh))
393 oldowner = ((ev.get("properties") or {}).get("old_attributes") or {}).get("owner_uuid")
394 newowner = ev.get("object_owner_uuid")
396 self.inodes.inode_cache.find_by_uuid(oldowner) +
397 self.inodes.inode_cache.find_by_uuid(newowner)):
402 def getattr(self, inode, ctx=None):
403 if inode not in self.inodes:
404 raise llfuse.FUSEError(errno.ENOENT)
406 e = self.inodes[inode]
408 entry = llfuse.EntryAttributes()
411 entry.entry_timeout = 60 if e.allow_dirent_cache else 0
412 entry.attr_timeout = 60 if e.allow_attr_cache else 0
414 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
415 if isinstance(e, Directory):
416 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
418 entry.st_mode |= stat.S_IFREG
419 if isinstance(e, FuseArvadosFile):
420 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
422 if self.enable_write and e.writable():
423 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
426 entry.st_uid = self.uid
427 entry.st_gid = self.gid
430 entry.st_size = e.size()
432 entry.st_blksize = 512
433 entry.st_blocks = (entry.st_size/512)+1
434 entry.st_atime_ns = int(e.atime() * 1000000000)
435 entry.st_mtime_ns = int(e.mtime() * 1000000000)
436 entry.st_ctime_ns = int(e.mtime() * 1000000000)
441 def setattr(self, inode, attr, fields, fh, ctx):
442 entry = self.getattr(inode)
444 if fh is not None and fh in self._filehandles:
445 handle = self._filehandles[fh]
448 e = self.inodes[inode]
450 if fields.update_size and isinstance(e, FuseArvadosFile):
451 with llfuse.lock_released:
452 e.arvfile.truncate(attr.st_size)
453 entry.st_size = e.arvfile.size()
458 def lookup(self, parent_inode, name, ctx):
459 name = unicode(name, self.inodes.encoding)
465 if parent_inode in self.inodes:
466 p = self.inodes[parent_inode]
469 inode = p.parent_inode
470 elif isinstance(p, Directory) and name in p:
471 inode = p[name].inode
474 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
475 parent_inode, name, inode)
476 self.inodes[inode].inc_ref()
477 return self.getattr(inode)
479 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
481 raise llfuse.FUSEError(errno.ENOENT)
484 def forget(self, inodes):
485 if self._shutdown_started.is_set():
487 for inode, nlookup in inodes:
488 ent = self.inodes[inode]
489 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
490 if ent.dec_ref(nlookup) == 0 and ent.dead:
491 self.inodes.del_entry(ent)
494 def open(self, inode, flags, ctx):
495 if inode in self.inodes:
496 p = self.inodes[inode]
498 raise llfuse.FUSEError(errno.ENOENT)
500 if isinstance(p, Directory):
501 raise llfuse.FUSEError(errno.EISDIR)
503 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
504 raise llfuse.FUSEError(errno.EPERM)
506 fh = next(self._filehandles_counter)
507 self._filehandles[fh] = FileHandle(fh, p)
510 # Normally, we will have received an "update" event if the
511 # parent collection is stale here. However, even if the parent
512 # collection hasn't changed, the manifest might have been
513 # fetched so long ago that the signatures on the data block
514 # locators have expired. Calling checkupdate() on all
515 # ancestors ensures the signatures will be refreshed if
517 while p.parent_inode in self.inodes:
518 if p == self.inodes[p.parent_inode]:
520 p = self.inodes[p.parent_inode]
524 _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
529 def read(self, fh, off, size):
530 _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
531 self.read_ops_counter.add(1)
533 if fh in self._filehandles:
534 handle = self._filehandles[fh]
536 raise llfuse.FUSEError(errno.EBADF)
538 self.inodes.touch(handle.obj)
540 r = handle.obj.readfrom(off, size, self.num_retries)
542 self.read_counter.add(len(r))
546 def write(self, fh, off, buf):
547 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
548 self.write_ops_counter.add(1)
550 if fh in self._filehandles:
551 handle = self._filehandles[fh]
553 raise llfuse.FUSEError(errno.EBADF)
555 if not handle.obj.writable():
556 raise llfuse.FUSEError(errno.EPERM)
558 self.inodes.touch(handle.obj)
560 w = handle.obj.writeto(off, buf, self.num_retries)
562 self.write_counter.add(w)
566 def release(self, fh):
567 if fh in self._filehandles:
569 self._filehandles[fh].flush()
573 self._filehandles[fh].release()
574 del self._filehandles[fh]
575 self.inodes.inode_cache.cap_cache()
577 def releasedir(self, fh):
581 def opendir(self, inode, ctx):
582 _logger.debug("arv-mount opendir: inode %i", inode)
584 if inode in self.inodes:
585 p = self.inodes[inode]
587 raise llfuse.FUSEError(errno.ENOENT)
589 if not isinstance(p, Directory):
590 raise llfuse.FUSEError(errno.ENOTDIR)
592 fh = next(self._filehandles_counter)
593 if p.parent_inode in self.inodes:
594 parent = self.inodes[p.parent_inode]
596 raise llfuse.FUSEError(errno.EIO)
601 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
605 def readdir(self, fh, off):
606 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
608 if fh in self._filehandles:
609 handle = self._filehandles[fh]
611 raise llfuse.FUSEError(errno.EBADF)
614 while e < len(handle.entries):
615 if handle.entries[e][1].inode in self.inodes:
616 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
620 def statfs(self, ctx):
621 st = llfuse.StatvfsData()
622 st.f_bsize = 128 * 1024
635 def _check_writable(self, inode_parent):
636 if not self.enable_write:
637 raise llfuse.FUSEError(errno.EROFS)
639 if inode_parent in self.inodes:
640 p = self.inodes[inode_parent]
642 raise llfuse.FUSEError(errno.ENOENT)
644 if not isinstance(p, Directory):
645 raise llfuse.FUSEError(errno.ENOTDIR)
648 raise llfuse.FUSEError(errno.EPERM)
653 def create(self, inode_parent, name, mode, flags, ctx):
654 _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
656 p = self._check_writable(inode_parent)
659 # The file entry should have been implicitly created by callback.
661 fh = next(self._filehandles_counter)
662 self._filehandles[fh] = FileHandle(fh, f)
666 return (fh, self.getattr(f.inode))
669 def mkdir(self, inode_parent, name, mode, ctx):
670 _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
672 p = self._check_writable(inode_parent)
675 # The dir entry should have been implicitly created by callback.
679 return self.getattr(d.inode)
682 def unlink(self, inode_parent, name, ctx):
683 _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
684 p = self._check_writable(inode_parent)
688 def rmdir(self, inode_parent, name, ctx):
689 _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
690 p = self._check_writable(inode_parent)
694 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx):
695 _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
696 src = self._check_writable(inode_parent_old)
697 dest = self._check_writable(inode_parent_new)
698 dest.rename(name_old, name_new, src)
702 if fh in self._filehandles:
703 self._filehandles[fh].flush()
705 def fsync(self, fh, datasync):
708 def fsyncdir(self, fh, datasync):