1 """FUSE driver for Arvados Keep
5 There is one `Operations` object per mount point. It is the entry point for all
6 read and write requests from the llfuse module.
8 The operations object owns an `Inodes` object. The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
12 The `Inodes` object owns an `InodeCache` object. The inode cache records the
13 memory footprint of file system objects and when they are last used. When the
14 cache limit is exceeded, the least recently used objects are cleared.
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
18 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
21 Directory objects inherit from `fusedir.Directory`. The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`. These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
28 The general FUSE operation flow is as follows:
30 - The request handler is called with either an inode or file handle that is the
31 subject of the operation.
33 - Look up the inode using the Inodes table or the file handle in the
34 filehandles table to get the file system object.
36 - For methods that alter files or directories, check that the operation is
37 valid and permitted using _check_writable().
39 - Call the relevant method on the file system object.
43 The FUSE driver supports the Arvados event bus. When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
73 # Default _notify_queue has a limit of 1000 items, but it really needs to be
74 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
77 if hasattr(llfuse, 'capi'):
79 llfuse.capi._notify_queue = Queue.Queue()
82 llfuse._notify_queue = Queue.Queue()
84 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
85 from fusefile import StringFile, FuseArvadosFile
87 _logger = logging.getLogger('arvados.arvados_fuse')
89 # Uncomment this to enable llfuse debug logging.
90 # log_handler = logging.StreamHandler()
91 # llogger = logging.getLogger('llfuse')
92 # llogger.addHandler(log_handler)
93 # llogger.setLevel(logging.DEBUG)
96 """Connects a numeric file handle to a File or Directory object that has
97 been opened by the client."""
99 def __init__(self, fh, obj):
111 class FileHandle(Handle):
112 """Connects a numeric file handle to a File object that has
113 been opened by the client."""
116 if self.obj.writable():
117 return self.obj.flush()
120 class DirectoryHandle(Handle):
121 """Connects a numeric file handle to a Directory object that has
122 been opened by the client."""
124 def __init__(self, fh, dirobj, entries):
125 super(DirectoryHandle, self).__init__(fh, dirobj)
126 self.entries = entries
129 class InodeCache(object):
130 """Records the memory footprint of objects and when they are last used.
132 When the cache limit is exceeded, the least recently used objects are
133 cleared. Clearing the object means discarding its contents to release
134 memory. The next time the object is accessed, it must be re-fetched from
135 the server. Note that the inode cache limit is a soft limit; the cache
136 limit may be exceeded if necessary to load very large objects, it may also
137 be exceeded if open file handles prevent objects from being cleared.
141 def __init__(self, cap, min_entries=4):
142 self._entries = collections.OrderedDict()
146 self.min_entries = min_entries
151 def _remove(self, obj, clear):
154 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
156 if obj.has_ref(True):
157 obj.kernel_invalidate()
158 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
162 # The llfuse lock is released in del_entry(), which is called by
163 # Directory.clear(). While the llfuse lock is released, it can happen
164 # that a reentrant call removes this entry before this call gets to it.
165 # Ensure that the entry is still valid before trying to remove it.
166 if obj.inode not in self._entries:
169 self._total -= obj.cache_size
170 del self._entries[obj.inode]
172 self._by_uuid[obj.cache_uuid].remove(obj)
173 if not self._by_uuid[obj.cache_uuid]:
174 del self._by_uuid[obj.cache_uuid]
175 obj.cache_uuid = None
177 _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
180 if self._total > self.cap:
181 for ent in self._entries.values():
182 if self._total < self.cap or len(self._entries) < self.min_entries:
184 self._remove(ent, True)
186 def manage(self, obj):
188 obj.cache_size = obj.objsize()
189 self._entries[obj.inode] = obj
190 obj.cache_uuid = obj.uuid()
192 if obj.cache_uuid not in self._by_uuid:
193 self._by_uuid[obj.cache_uuid] = [obj]
195 if obj not in self._by_uuid[obj.cache_uuid]:
196 self._by_uuid[obj.cache_uuid].append(obj)
197 self._total += obj.objsize()
198 _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
201 def touch(self, obj):
203 if obj.inode in self._entries:
204 self._remove(obj, False)
207 def unmanage(self, obj):
208 if obj.persisted() and obj.inode in self._entries:
209 self._remove(obj, True)
211 def find_by_uuid(self, uuid):
212 return self._by_uuid.get(uuid, [])
215 self._entries.clear()
216 self._by_uuid.clear()
219 class Inodes(object):
220 """Manage the set of inodes. This is the mapping from a numeric id
221 to a concrete File or Directory object"""
223 def __init__(self, inode_cache, encoding="utf-8"):
225 self._counter = itertools.count(llfuse.ROOT_INODE)
226 self.inode_cache = inode_cache
227 self.encoding = encoding
228 self.deferred_invalidations = []
230 def __getitem__(self, item):
231 return self._entries[item]
233 def __setitem__(self, key, item):
234 self._entries[key] = item
237 return self._entries.iterkeys()
240 return self._entries.items()
242 def __contains__(self, k):
243 return k in self._entries
245 def touch(self, entry):
246 entry._atime = time.time()
247 self.inode_cache.touch(entry)
249 def add_entry(self, entry):
250 entry.inode = next(self._counter)
251 if entry.inode == llfuse.ROOT_INODE:
253 self._entries[entry.inode] = entry
254 self.inode_cache.manage(entry)
257 def del_entry(self, entry):
258 if entry.ref_count == 0:
259 self.inode_cache.unmanage(entry)
260 del self._entries[entry.inode]
261 with llfuse.lock_released:
263 self.invalidate_inode(entry.inode)
267 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
269 def invalidate_inode(self, inode):
270 llfuse.invalidate_inode(inode)
272 def invalidate_entry(self, inode, name):
273 llfuse.invalidate_entry(inode, name.encode(self.encoding))
276 self.inode_cache.clear()
278 for k,v in self._entries.items():
281 except Exception as e:
282 _logger.exception("Error during finalize of inode %i", k)
284 self._entries.clear()
287 def catch_exceptions(orig_func):
288 """Catch uncaught exceptions and log them consistently."""
290 @functools.wraps(orig_func)
291 def catch_exceptions_wrapper(self, *args, **kwargs):
293 return orig_func(self, *args, **kwargs)
294 except llfuse.FUSEError:
296 except EnvironmentError as e:
297 raise llfuse.FUSEError(e.errno)
298 except arvados.errors.KeepWriteError as e:
299 _logger.error("Keep write error: " + str(e))
300 raise llfuse.FUSEError(errno.EIO)
301 except arvados.errors.NotFoundError as e:
302 _logger.error("Block not found error: " + str(e))
303 raise llfuse.FUSEError(errno.EIO)
305 _logger.exception("Unhandled exception during FUSE operation")
306 raise llfuse.FUSEError(errno.EIO)
308 return catch_exceptions_wrapper
311 class Operations(llfuse.Operations):
312 """This is the main interface with llfuse.
314 The methods on this object are called by llfuse threads to service FUSE
315 events to query and read from the file system.
317 llfuse has its own global lock which is acquired before calling a request handler,
318 so request handlers do not run concurrently unless the lock is explicitly released
319 using 'with llfuse.lock_released:'
323 def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
324 super(Operations, self).__init__()
326 self._api_client = api_client
329 inode_cache = InodeCache(cap=256*1024*1024)
330 self.inodes = Inodes(inode_cache, encoding=encoding)
333 self.enable_write = enable_write
335 # dict of inode to filehandle
336 self._filehandles = {}
337 self._filehandles_counter = itertools.count(0)
339 # Other threads that need to wait until the fuse driver
340 # is fully initialized should wait() on this event object.
341 self.initlock = threading.Event()
343 # If we get overlapping shutdown events (e.g., fusermount -u
344 # -z and operations.destroy()) llfuse calls forget() on inodes
345 # that have already been deleted. To avoid this, we make
346 # forget() a no-op if called after destroy().
347 self._shutdown_started = threading.Event()
349 self.num_retries = num_retries
351 self.read_counter = arvados.keep.Counter()
352 self.write_counter = arvados.keep.Counter()
353 self.read_ops_counter = arvados.keep.Counter()
354 self.write_ops_counter = arvados.keep.Counter()
359 # Allow threads that are waiting for the driver to be finished
360 # initializing to continue
365 self._shutdown_started.set()
370 if llfuse.lock.acquire():
373 llfuse.lock.release()
378 def access(self, inode, mode, ctx):
381 def listen_for_events(self):
382 self.events = arvados.events.subscribe(
384 [["event_type", "in", ["create", "update", "delete"]]],
388 def on_event(self, ev):
389 if 'event_type' not in ev:
392 new_attrs = (ev.get("properties") or {}).get("new_attributes") or {}
393 pdh = new_attrs.get("portable_data_hash")
394 # new_attributes.modified_at currently lacks
395 # subsecond precision (see #6347) so use event_at
396 # which should always be the same.
397 stamp = ev.get("event_at")
399 for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
401 if stamp and pdh and ev.get("object_kind") == "arvados#collection":
402 item.update(to_record_version=(stamp, pdh))
406 oldowner = ((ev.get("properties") or {}).get("old_attributes") or {}).get("owner_uuid")
407 newowner = ev.get("object_owner_uuid")
409 self.inodes.inode_cache.find_by_uuid(oldowner) +
410 self.inodes.inode_cache.find_by_uuid(newowner)):
415 def getattr(self, inode, ctx=None):
416 if inode not in self.inodes:
417 raise llfuse.FUSEError(errno.ENOENT)
419 e = self.inodes[inode]
421 entry = llfuse.EntryAttributes()
424 entry.entry_timeout = 60 if e.allow_dirent_cache else 0
425 entry.attr_timeout = 60 if e.allow_attr_cache else 0
427 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
428 if isinstance(e, Directory):
429 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
431 entry.st_mode |= stat.S_IFREG
432 if isinstance(e, FuseArvadosFile):
433 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
435 if self.enable_write and e.writable():
436 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
439 entry.st_uid = self.uid
440 entry.st_gid = self.gid
443 entry.st_size = e.size()
445 entry.st_blksize = 512
446 entry.st_blocks = (entry.st_size/512)+1
447 if hasattr(entry, 'st_atime_ns'):
449 entry.st_atime_ns = int(e.atime() * 1000000000)
450 entry.st_mtime_ns = int(e.mtime() * 1000000000)
451 entry.st_ctime_ns = int(e.mtime() * 1000000000)
454 entry.st_atime = int(e.atime)
455 entry.st_mtime = int(e.mtime)
456 entry.st_ctime = int(e.mtime)
461 def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
462 entry = self.getattr(inode)
464 if fh is not None and fh in self._filehandles:
465 handle = self._filehandles[fh]
468 e = self.inodes[inode]
472 update_size = attr.st_size is not None
475 update_size = fields.update_size
476 if update_size and isinstance(e, FuseArvadosFile):
477 with llfuse.lock_released:
478 e.arvfile.truncate(attr.st_size)
479 entry.st_size = e.arvfile.size()
484 def lookup(self, parent_inode, name, ctx=None):
485 name = unicode(name, self.inodes.encoding)
491 if parent_inode in self.inodes:
492 p = self.inodes[parent_inode]
495 inode = p.parent_inode
496 elif isinstance(p, Directory) and name in p:
497 inode = p[name].inode
500 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
501 parent_inode, name, inode)
502 self.inodes[inode].inc_ref()
503 return self.getattr(inode)
505 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
507 raise llfuse.FUSEError(errno.ENOENT)
510 def forget(self, inodes):
511 if self._shutdown_started.is_set():
513 for inode, nlookup in inodes:
514 ent = self.inodes[inode]
515 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
516 if ent.dec_ref(nlookup) == 0 and ent.dead:
517 self.inodes.del_entry(ent)
520 def open(self, inode, flags, ctx=None):
521 if inode in self.inodes:
522 p = self.inodes[inode]
524 raise llfuse.FUSEError(errno.ENOENT)
526 if isinstance(p, Directory):
527 raise llfuse.FUSEError(errno.EISDIR)
529 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
530 raise llfuse.FUSEError(errno.EPERM)
532 fh = next(self._filehandles_counter)
533 self._filehandles[fh] = FileHandle(fh, p)
536 # Normally, we will have received an "update" event if the
537 # parent collection is stale here. However, even if the parent
538 # collection hasn't changed, the manifest might have been
539 # fetched so long ago that the signatures on the data block
540 # locators have expired. Calling checkupdate() on all
541 # ancestors ensures the signatures will be refreshed if
543 while p.parent_inode in self.inodes:
544 if p == self.inodes[p.parent_inode]:
546 p = self.inodes[p.parent_inode]
550 _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
555 def read(self, fh, off, size):
556 _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
557 self.read_ops_counter.add(1)
559 if fh in self._filehandles:
560 handle = self._filehandles[fh]
562 raise llfuse.FUSEError(errno.EBADF)
564 self.inodes.touch(handle.obj)
566 r = handle.obj.readfrom(off, size, self.num_retries)
568 self.read_counter.add(len(r))
572 def write(self, fh, off, buf):
573 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
574 self.write_ops_counter.add(1)
576 if fh in self._filehandles:
577 handle = self._filehandles[fh]
579 raise llfuse.FUSEError(errno.EBADF)
581 if not handle.obj.writable():
582 raise llfuse.FUSEError(errno.EPERM)
584 self.inodes.touch(handle.obj)
586 w = handle.obj.writeto(off, buf, self.num_retries)
588 self.write_counter.add(w)
592 def release(self, fh):
593 if fh in self._filehandles:
594 _logger.debug("arv-mount release fh %i", fh)
596 self._filehandles[fh].flush()
600 self._filehandles[fh].release()
601 del self._filehandles[fh]
602 self.inodes.inode_cache.cap_cache()
604 def releasedir(self, fh):
608 def opendir(self, inode, ctx=None):
609 _logger.debug("arv-mount opendir: inode %i", inode)
611 if inode in self.inodes:
612 p = self.inodes[inode]
614 raise llfuse.FUSEError(errno.ENOENT)
616 if not isinstance(p, Directory):
617 raise llfuse.FUSEError(errno.ENOTDIR)
619 fh = next(self._filehandles_counter)
620 if p.parent_inode in self.inodes:
621 parent = self.inodes[p.parent_inode]
623 raise llfuse.FUSEError(errno.EIO)
628 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
632 def readdir(self, fh, off):
633 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
635 if fh in self._filehandles:
636 handle = self._filehandles[fh]
638 raise llfuse.FUSEError(errno.EBADF)
641 while e < len(handle.entries):
642 if handle.entries[e][1].inode in self.inodes:
643 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
647 def statfs(self, ctx=None):
648 st = llfuse.StatvfsData()
649 st.f_bsize = 128 * 1024
662 def _check_writable(self, inode_parent):
663 if not self.enable_write:
664 raise llfuse.FUSEError(errno.EROFS)
666 if inode_parent in self.inodes:
667 p = self.inodes[inode_parent]
669 raise llfuse.FUSEError(errno.ENOENT)
671 if not isinstance(p, Directory):
672 raise llfuse.FUSEError(errno.ENOTDIR)
675 raise llfuse.FUSEError(errno.EPERM)
680 def create(self, inode_parent, name, mode, flags, ctx=None):
681 _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
683 p = self._check_writable(inode_parent)
686 # The file entry should have been implicitly created by callback.
688 fh = next(self._filehandles_counter)
689 self._filehandles[fh] = FileHandle(fh, f)
693 return (fh, self.getattr(f.inode))
696 def mkdir(self, inode_parent, name, mode, ctx=None):
697 _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
699 p = self._check_writable(inode_parent)
702 # The dir entry should have been implicitly created by callback.
706 return self.getattr(d.inode)
709 def unlink(self, inode_parent, name, ctx=None):
710 _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
711 p = self._check_writable(inode_parent)
715 def rmdir(self, inode_parent, name, ctx=None):
716 _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
717 p = self._check_writable(inode_parent)
721 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
722 _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
723 src = self._check_writable(inode_parent_old)
724 dest = self._check_writable(inode_parent_new)
725 dest.rename(name_old, name_new, src)
729 if fh in self._filehandles:
730 self._filehandles[fh].flush()
732 def fsync(self, fh, datasync):
735 def fsyncdir(self, fh, datasync):