1 """FUSE driver for Arvados Keep
5 There is one `Operations` object per mount point. It is the entry point for all
6 read and write requests from the llfuse module.
8 The operations object owns an `Inodes` object. The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
12 The `Inodes` object owns an `InodeCache` object. The inode cache records the
13 memory footprint of file system objects and when they are last used. When the
14 cache limit is exceeded, the least recently used objects are cleared.
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
18 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
21 Directory objects inherit from `fusedir.Directory`. The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`. These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
28 The general FUSE operation flow is as follows:
30 - The request handler is called with either an inode or file handle that is the
31 subject of the operation.
33 - Look up the inode using the Inodes table or the file handle in the
34 filehandles table to get the file system object.
36 - For methods that alter files or directories, check that the operation is
37 valid and permitted using _check_writable().
39 - Call the relevant method on the file system object.
43 The FUSE driver supports the Arvados event bus. When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
73 # Default _notify_queue has a limit of 1000 items, but it really needs to be
74 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
77 llfuse.capi._notify_queue = Queue.Queue()
79 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
80 from fusefile import StringFile, FuseArvadosFile
82 _logger = logging.getLogger('arvados.arvados_fuse')
84 # Uncomment this to enable llfuse debug logging.
85 # log_handler = logging.StreamHandler()
86 # llogger = logging.getLogger('llfuse')
87 # llogger.addHandler(log_handler)
88 # llogger.setLevel(logging.DEBUG)
91 """Connects a numeric file handle to a File or Directory object that has
92 been opened by the client."""
94 def __init__(self, fh, obj):
103 if self.obj.writable():
104 return self.obj.flush()
107 class FileHandle(Handle):
108 """Connects a numeric file handle to a File object that has
109 been opened by the client."""
113 class DirectoryHandle(Handle):
114 """Connects a numeric file handle to a Directory object that has
115 been opened by the client."""
117 def __init__(self, fh, dirobj, entries):
118 super(DirectoryHandle, self).__init__(fh, dirobj)
119 self.entries = entries
122 class InodeCache(object):
123 """Records the memory footprint of objects and when they are last used.
125 When the cache limit is exceeded, the least recently used objects are
126 cleared. Clearing the object means discarding its contents to release
127 memory. The next time the object is accessed, it must be re-fetched from
128 the server. Note that the inode cache limit is a soft limit; the cache
129 limit may be exceeded if necessary to load very large objects, it may also
130 be exceeded if open file handles prevent objects from being cleared.
134 def __init__(self, cap, min_entries=4):
135 self._entries = collections.OrderedDict()
137 self._counter = itertools.count(0)
140 self.min_entries = min_entries
145 def _remove(self, obj, clear):
148 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
150 if obj.has_ref(only_children=True):
152 _logger.debug("InodeCache invalidate inode %i", obj.inode)
155 self._total -= obj.cache_size
156 del self._entries[obj.cache_priority]
158 self._by_uuid[obj.cache_uuid].remove(obj)
159 if not self._by_uuid[obj.cache_uuid]:
160 del self._by_uuid[obj.cache_uuid]
161 obj.cache_uuid = None
163 _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
167 if self._total > self.cap:
168 for key in list(self._entries.keys()):
169 if self._total < self.cap or len(self._entries) < self.min_entries:
171 self._remove(self._entries[key], True)
173 def manage(self, obj):
175 obj.cache_priority = next(self._counter)
176 obj.cache_size = obj.objsize()
177 self._entries[obj.cache_priority] = obj
178 obj.cache_uuid = obj.uuid()
180 if obj.cache_uuid not in self._by_uuid:
181 self._by_uuid[obj.cache_uuid] = [obj]
183 if obj not in self._by_uuid[obj.cache_uuid]:
184 self._by_uuid[obj.cache_uuid].append(obj)
185 self._total += obj.objsize()
186 _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
189 obj.cache_priority = None
191 def touch(self, obj):
193 if obj.cache_priority in self._entries:
194 self._remove(obj, False)
197 def unmanage(self, obj):
198 if obj.persisted() and obj.cache_priority in self._entries:
199 self._remove(obj, True)
201 def find_by_uuid(self, uuid):
202 return self._by_uuid.get(uuid, [])
205 self._entries.clear()
206 self._by_uuid.clear()
209 class Inodes(object):
210 """Manage the set of inodes. This is the mapping from a numeric id
211 to a concrete File or Directory object"""
213 def __init__(self, inode_cache, encoding="utf-8"):
215 self._counter = itertools.count(llfuse.ROOT_INODE)
216 self.inode_cache = inode_cache
217 self.encoding = encoding
218 self.deferred_invalidations = []
220 def __getitem__(self, item):
221 return self._entries[item]
223 def __setitem__(self, key, item):
224 self._entries[key] = item
227 return self._entries.iterkeys()
230 return self._entries.items()
232 def __contains__(self, k):
233 return k in self._entries
235 def touch(self, entry):
236 entry._atime = time.time()
237 self.inode_cache.touch(entry)
239 def add_entry(self, entry):
240 entry.inode = next(self._counter)
241 if entry.inode == llfuse.ROOT_INODE:
243 self._entries[entry.inode] = entry
244 self.inode_cache.manage(entry)
247 def del_entry(self, entry):
248 if entry.ref_count == 0:
249 self.inode_cache.unmanage(entry)
250 del self._entries[entry.inode]
251 with llfuse.lock_released:
253 self.invalidate_inode(entry.inode)
257 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
259 def invalidate_inode(self, inode):
260 llfuse.invalidate_inode(inode)
262 def invalidate_entry(self, inode, name):
263 llfuse.invalidate_entry(inode, name.encode(self.encoding))
266 self.inode_cache.clear()
268 for k,v in self._entries.items():
271 except Exception as e:
272 _logger.exception("Error during finalize of inode %i", k)
274 self._entries.clear()
277 def catch_exceptions(orig_func):
278 """Catch uncaught exceptions and log them consistently."""
280 @functools.wraps(orig_func)
281 def catch_exceptions_wrapper(self, *args, **kwargs):
283 return orig_func(self, *args, **kwargs)
284 except llfuse.FUSEError:
286 except EnvironmentError as e:
287 raise llfuse.FUSEError(e.errno)
288 except arvados.errors.KeepWriteError as e:
289 _logger.error("Keep write error: " + str(e))
290 raise llfuse.FUSEError(errno.EIO)
291 except arvados.errors.NotFoundError as e:
292 _logger.error("Block not found error: " + str(e))
293 raise llfuse.FUSEError(errno.EIO)
295 _logger.exception("Unhandled exception during FUSE operation")
296 raise llfuse.FUSEError(errno.EIO)
298 return catch_exceptions_wrapper
301 class Operations(llfuse.Operations):
302 """This is the main interface with llfuse.
304 The methods on this object are called by llfuse threads to service FUSE
305 events to query and read from the file system.
307 llfuse has its own global lock which is acquired before calling a request handler,
308 so request handlers do not run concurrently unless the lock is explicitly released
309 using 'with llfuse.lock_released:'
313 def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
314 super(Operations, self).__init__()
316 self._api_client = api_client
319 inode_cache = InodeCache(cap=256*1024*1024)
320 self.inodes = Inodes(inode_cache, encoding=encoding)
323 self.enable_write = enable_write
325 # dict of inode to filehandle
326 self._filehandles = {}
327 self._filehandles_counter = itertools.count(0)
329 # Other threads that need to wait until the fuse driver
330 # is fully initialized should wait() on this event object.
331 self.initlock = threading.Event()
333 # If we get overlapping shutdown events (e.g., fusermount -u
334 # -z and operations.destroy()) llfuse calls forget() on inodes
335 # that have already been deleted. To avoid this, we make
336 # forget() a no-op if called after destroy().
337 self._shutdown_started = threading.Event()
339 self.num_retries = num_retries
341 self.read_counter = arvados.keep.Counter()
342 self.write_counter = arvados.keep.Counter()
343 self.read_ops_counter = arvados.keep.Counter()
344 self.write_ops_counter = arvados.keep.Counter()
349 # Allow threads that are waiting for the driver to be finished
350 # initializing to continue
356 self._shutdown_started.set()
363 def access(self, inode, mode, ctx):
366 def listen_for_events(self):
367 self.events = arvados.events.subscribe(self._api_client,
368 [["event_type", "in", ["create", "update", "delete"]]],
372 def on_event(self, ev):
373 if 'event_type' not in ev:
376 for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
378 if ev["object_kind"] == "arvados#collection":
379 new_attr = (ev.get("properties") and
380 ev["properties"].get("new_attributes") and
381 ev["properties"]["new_attributes"])
383 # new_attributes.modified_at currently lacks
384 # subsecond precision (see #6347) so use event_at
385 # which should always be the same.
387 (ev["event_at"], new_attr["portable_data_hash"])
388 if new_attr else None)
390 item.update(to_record_version=record_version)
395 ev.get("properties") and
396 ev["properties"].get("old_attributes") and
397 ev["properties"]["old_attributes"].get("owner_uuid"))
398 newowner = ev["object_owner_uuid"]
400 self.inodes.inode_cache.find_by_uuid(oldowner) +
401 self.inodes.inode_cache.find_by_uuid(newowner)):
407 def getattr(self, inode):
408 if inode not in self.inodes:
409 raise llfuse.FUSEError(errno.ENOENT)
411 e = self.inodes[inode]
413 entry = llfuse.EntryAttributes()
416 entry.entry_timeout = 60 if e.allow_dirent_cache else 0
417 entry.attr_timeout = 60 if e.allow_attr_cache else 0
419 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
420 if isinstance(e, Directory):
421 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
423 entry.st_mode |= stat.S_IFREG
424 if isinstance(e, FuseArvadosFile):
425 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
427 if self.enable_write and e.writable():
428 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
431 entry.st_uid = self.uid
432 entry.st_gid = self.gid
435 entry.st_size = e.size()
437 entry.st_blksize = 512
438 entry.st_blocks = (entry.st_size/512)+1
439 entry.st_atime = int(e.atime())
440 entry.st_mtime = int(e.mtime())
441 entry.st_ctime = int(e.mtime())
446 def setattr(self, inode, attr):
447 entry = self.getattr(inode)
449 e = self.inodes[inode]
451 if attr.st_size is not None and isinstance(e, FuseArvadosFile):
452 with llfuse.lock_released:
453 e.arvfile.truncate(attr.st_size)
454 entry.st_size = e.arvfile.size()
459 def lookup(self, parent_inode, name):
460 name = unicode(name, self.inodes.encoding)
466 if parent_inode in self.inodes:
467 p = self.inodes[parent_inode]
470 inode = p.parent_inode
471 elif isinstance(p, Directory) and name in p:
472 inode = p[name].inode
475 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
476 parent_inode, name, inode)
477 self.inodes[inode].inc_ref()
478 return self.getattr(inode)
480 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
482 raise llfuse.FUSEError(errno.ENOENT)
485 def forget(self, inodes):
486 if self._shutdown_started.is_set():
488 for inode, nlookup in inodes:
489 ent = self.inodes[inode]
490 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
491 if ent.dec_ref(nlookup) == 0 and ent.dead:
492 self.inodes.del_entry(ent)
495 def open(self, inode, flags):
496 if inode in self.inodes:
497 p = self.inodes[inode]
499 raise llfuse.FUSEError(errno.ENOENT)
501 if isinstance(p, Directory):
502 raise llfuse.FUSEError(errno.EISDIR)
504 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
505 raise llfuse.FUSEError(errno.EPERM)
507 fh = next(self._filehandles_counter)
508 self._filehandles[fh] = FileHandle(fh, p)
511 _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
516 def read(self, fh, off, size):
517 _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
518 self.read_ops_counter.add(1)
520 if fh in self._filehandles:
521 handle = self._filehandles[fh]
523 raise llfuse.FUSEError(errno.EBADF)
525 self.inodes.touch(handle.obj)
527 r = handle.obj.readfrom(off, size, self.num_retries)
529 self.read_counter.add(len(r))
533 def write(self, fh, off, buf):
534 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
535 self.write_ops_counter.add(1)
537 if fh in self._filehandles:
538 handle = self._filehandles[fh]
540 raise llfuse.FUSEError(errno.EBADF)
542 if not handle.obj.writable():
543 raise llfuse.FUSEError(errno.EPERM)
545 self.inodes.touch(handle.obj)
547 w = handle.obj.writeto(off, buf, self.num_retries)
549 self.write_counter.add(w)
553 def release(self, fh):
554 if fh in self._filehandles:
556 self._filehandles[fh].flush()
560 self._filehandles[fh].release()
561 del self._filehandles[fh]
562 self.inodes.inode_cache.cap_cache()
564 def releasedir(self, fh):
568 def opendir(self, inode):
569 _logger.debug("arv-mount opendir: inode %i", inode)
571 if inode in self.inodes:
572 p = self.inodes[inode]
574 raise llfuse.FUSEError(errno.ENOENT)
576 if not isinstance(p, Directory):
577 raise llfuse.FUSEError(errno.ENOTDIR)
579 fh = next(self._filehandles_counter)
580 if p.parent_inode in self.inodes:
581 parent = self.inodes[p.parent_inode]
583 raise llfuse.FUSEError(errno.EIO)
588 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
592 def readdir(self, fh, off):
593 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
595 if fh in self._filehandles:
596 handle = self._filehandles[fh]
598 raise llfuse.FUSEError(errno.EBADF)
601 while e < len(handle.entries):
602 if handle.entries[e][1].inode in self.inodes:
603 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
608 st = llfuse.StatvfsData()
609 st.f_bsize = 128 * 1024
622 def _check_writable(self, inode_parent):
623 if not self.enable_write:
624 raise llfuse.FUSEError(errno.EROFS)
626 if inode_parent in self.inodes:
627 p = self.inodes[inode_parent]
629 raise llfuse.FUSEError(errno.ENOENT)
631 if not isinstance(p, Directory):
632 raise llfuse.FUSEError(errno.ENOTDIR)
635 raise llfuse.FUSEError(errno.EPERM)
640 def create(self, inode_parent, name, mode, flags, ctx):
641 _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
643 p = self._check_writable(inode_parent)
646 # The file entry should have been implicitly created by callback.
648 fh = next(self._filehandles_counter)
649 self._filehandles[fh] = FileHandle(fh, f)
653 return (fh, self.getattr(f.inode))
656 def mkdir(self, inode_parent, name, mode, ctx):
657 _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
659 p = self._check_writable(inode_parent)
662 # The dir entry should have been implicitly created by callback.
666 return self.getattr(d.inode)
669 def unlink(self, inode_parent, name):
670 _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
671 p = self._check_writable(inode_parent)
675 def rmdir(self, inode_parent, name):
676 _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
677 p = self._check_writable(inode_parent)
681 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
682 _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
683 src = self._check_writable(inode_parent_old)
684 dest = self._check_writable(inode_parent_new)
685 dest.rename(name_old, name_new, src)
689 if fh in self._filehandles:
690 self._filehandles[fh].flush()
692 def fsync(self, fh, datasync):
695 def fsyncdir(self, fh, datasync):