1 """FUSE driver for Arvados Keep
5 There is one `Operations` object per mount point. It is the entry point for all
6 read and write requests from the llfuse module.
8 The operations object owns an `Inodes` object. The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
12 The `Inodes` object owns an `InodeCache` object. The inode cache records the
13 memory footprint of file system objects and when they are last used. When the
14 cache limit is exceeded, the least recently used objects are cleared.
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
18 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
21 Directory objects inherit from `fusedir.Directory`. The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`. These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
28 The general FUSE operation flow is as follows:
30 - The request handler is called with either an inode or file handle that is the
31 subject of the operation.
33 - Look up the inode using the Inodes table or the file handle in the
34 filehandles table to get the file system object.
36 - For methods that alter files or directories, check that the operation is
37 valid and permitted using _check_writable().
39 - Call the relevant method on the file system object.
43 The FUSE driver supports the Arvados event bus. When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
72 # Default _notify_queue has a limit of 1000 items, but it really needs to be
73 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
76 llfuse.capi._notify_queue = Queue.Queue()
78 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
79 from fusefile import StringFile, FuseArvadosFile
81 _logger = logging.getLogger('arvados.arvados_fuse')
83 # Uncomment this to enable llfuse debug logging.
84 # log_handler = logging.StreamHandler()
85 # llogger = logging.getLogger('llfuse')
86 # llogger.addHandler(log_handler)
87 # llogger.setLevel(logging.DEBUG)
90 """Connects a numeric file handle to a File or Directory object that has
91 been opened by the client."""
93 def __init__(self, fh, obj):
102 return self.obj.flush()
105 class FileHandle(Handle):
106 """Connects a numeric file handle to a File object that has
107 been opened by the client."""
111 class DirectoryHandle(Handle):
112 """Connects a numeric file handle to a Directory object that has
113 been opened by the client."""
115 def __init__(self, fh, dirobj, entries):
116 super(DirectoryHandle, self).__init__(fh, dirobj)
117 self.entries = entries
120 class InodeCache(object):
121 """Records the memory footprint of objects and when they are last used.
123 When the cache limit is exceeded, the least recently used objects are
124 cleared. Clearing the object means discarding its contents to release
125 memory. The next time the object is accessed, it must be re-fetched from
126 the server. Note that the inode cache limit is a soft limit; the cache
127 limit may be exceeded if necessary to load very large objects, it may also
128 be exceeded if open file handles prevent objects from being cleared.
132 def __init__(self, cap, min_entries=4):
133 self._entries = collections.OrderedDict()
135 self._counter = itertools.count(0)
138 self.min_entries = min_entries
143 def _remove(self, obj, clear):
144 if clear and not obj.clear():
145 _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
147 self._total -= obj.cache_size
148 del self._entries[obj.cache_priority]
150 del self._by_uuid[obj.cache_uuid]
151 obj.cache_uuid = None
153 _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
157 if self._total > self.cap:
158 for key in list(self._entries.keys()):
159 if self._total < self.cap or len(self._entries) < self.min_entries:
161 self._remove(self._entries[key], True)
163 def manage(self, obj):
165 obj.cache_priority = next(self._counter)
166 obj.cache_size = obj.objsize()
167 self._entries[obj.cache_priority] = obj
168 obj.cache_uuid = obj.uuid()
170 self._by_uuid[obj.cache_uuid] = obj
171 self._total += obj.objsize()
172 _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
175 obj.cache_priority = None
177 def touch(self, obj):
179 if obj.cache_priority in self._entries:
180 self._remove(obj, False)
183 def unmanage(self, obj):
184 if obj.persisted() and obj.cache_priority in self._entries:
185 self._remove(obj, True)
187 def find(self, uuid):
188 return self._by_uuid.get(uuid)
190 class Inodes(object):
191 """Manage the set of inodes. This is the mapping from a numeric id
192 to a concrete File or Directory object"""
194 def __init__(self, inode_cache, encoding="utf-8"):
196 self._counter = itertools.count(llfuse.ROOT_INODE)
197 self.inode_cache = inode_cache
198 self.encoding = encoding
199 self.deferred_invalidations = []
201 def __getitem__(self, item):
202 return self._entries[item]
204 def __setitem__(self, key, item):
205 self._entries[key] = item
208 return self._entries.iterkeys()
211 return self._entries.items()
213 def __contains__(self, k):
214 return k in self._entries
216 def touch(self, entry):
217 entry._atime = time.time()
218 self.inode_cache.touch(entry)
220 def add_entry(self, entry):
221 entry.inode = next(self._counter)
222 if entry.inode == llfuse.ROOT_INODE:
224 self._entries[entry.inode] = entry
225 self.inode_cache.manage(entry)
228 def del_entry(self, entry):
229 if entry.ref_count == 0:
230 self.inode_cache.unmanage(entry)
231 del self._entries[entry.inode]
232 with llfuse.lock_released:
234 self.invalidate_inode(entry.inode)
238 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
240 def invalidate_inode(self, inode):
241 llfuse.invalidate_inode(inode)
243 def invalidate_entry(self, inode, name):
244 llfuse.invalidate_entry(inode, name)
247 def catch_exceptions(orig_func):
248 """Catch uncaught exceptions and log them consistently."""
250 @functools.wraps(orig_func)
251 def catch_exceptions_wrapper(self, *args, **kwargs):
253 return orig_func(self, *args, **kwargs)
254 except llfuse.FUSEError:
256 except EnvironmentError as e:
257 raise llfuse.FUSEError(e.errno)
258 except arvados.errors.KeepWriteError as e:
259 _logger.error("Keep write error: " + str(e))
260 raise llfuse.FUSEError(errno.EIO)
261 except arvados.errors.NotFoundError as e:
262 _logger.error("Block not found error: " + str(e))
263 raise llfuse.FUSEError(errno.EIO)
265 _logger.exception("Unhandled exception during FUSE operation")
266 raise llfuse.FUSEError(errno.EIO)
268 return catch_exceptions_wrapper
271 class Operations(llfuse.Operations):
272 """This is the main interface with llfuse.
274 The methods on this object are called by llfuse threads to service FUSE
275 events to query and read from the file system.
277 llfuse has its own global lock which is acquired before calling a request handler,
278 so request handlers do not run concurrently unless the lock is explicitly released
279 using 'with llfuse.lock_released:'
283 def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
284 super(Operations, self).__init__()
287 inode_cache = InodeCache(cap=256*1024*1024)
288 self.inodes = Inodes(inode_cache, encoding=encoding)
291 self.enable_write = enable_write
293 # dict of inode to filehandle
294 self._filehandles = {}
295 self._filehandles_counter = itertools.count(0)
297 # Other threads that need to wait until the fuse driver
298 # is fully initialized should wait() on this event object.
299 self.initlock = threading.Event()
301 self.num_retries = num_retries
306 # Allow threads that are waiting for the driver to be finished
307 # initializing to continue
316 for k,v in self.inodes.items():
319 except Exception as e:
320 _logger.exception("Error during finalize of inode %i", k)
323 def access(self, inode, mode, ctx):
326 def listen_for_events(self, api_client):
327 self.events = arvados.events.subscribe(api_client,
328 [["event_type", "in", ["create", "update", "delete"]]],
332 def on_event(self, ev):
333 if 'event_type' in ev:
335 item = self.inodes.inode_cache.find(ev["object_uuid"])
338 if ev["object_kind"] == "arvados#collection":
339 new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
341 # new_attributes.modified_at currently lacks subsecond precision (see #6347) so use event_at which
342 # should always be the same.
343 #record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
344 record_version = (ev["event_at"], new_attr["portable_data_hash"]) if new_attr else None
346 item.update(to_record_version=record_version)
350 oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid")
351 olditemparent = self.inodes.inode_cache.find(oldowner)
352 if olditemparent is not None:
353 olditemparent.invalidate()
354 olditemparent.update()
356 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
357 if itemparent is not None:
358 itemparent.invalidate()
363 def getattr(self, inode):
364 if inode not in self.inodes:
365 raise llfuse.FUSEError(errno.ENOENT)
367 e = self.inodes[inode]
369 entry = llfuse.EntryAttributes()
372 entry.entry_timeout = 60
373 entry.attr_timeout = 60
375 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
376 if isinstance(e, Directory):
377 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
379 entry.st_mode |= stat.S_IFREG
380 if isinstance(e, FuseArvadosFile):
381 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
383 if self.enable_write and e.writable():
384 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
387 entry.st_uid = self.uid
388 entry.st_gid = self.gid
391 entry.st_size = e.size()
393 entry.st_blksize = 512
394 entry.st_blocks = (entry.st_size/512)+1
395 entry.st_atime = int(e.atime())
396 entry.st_mtime = int(e.mtime())
397 entry.st_ctime = int(e.mtime())
402 def setattr(self, inode, attr):
403 entry = self.getattr(inode)
405 e = self.inodes[inode]
407 if attr.st_size is not None and isinstance(e, FuseArvadosFile):
408 with llfuse.lock_released:
409 e.arvfile.truncate(attr.st_size)
410 entry.st_size = e.arvfile.size()
415 def lookup(self, parent_inode, name):
416 name = unicode(name, self.inodes.encoding)
422 if parent_inode in self.inodes:
423 p = self.inodes[parent_inode]
425 inode = p.parent_inode
426 elif isinstance(p, Directory) and name in p:
427 inode = p[name].inode
430 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
431 parent_inode, name, inode)
432 self.inodes[inode].inc_ref()
433 return self.getattr(inode)
435 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
437 raise llfuse.FUSEError(errno.ENOENT)
440 def forget(self, inodes):
441 for inode, nlookup in inodes:
442 ent = self.inodes[inode]
443 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
444 if ent.dec_ref(nlookup) == 0 and ent.dead:
445 self.inodes.del_entry(ent)
448 def open(self, inode, flags):
449 if inode in self.inodes:
450 p = self.inodes[inode]
452 raise llfuse.FUSEError(errno.ENOENT)
454 if isinstance(p, Directory):
455 raise llfuse.FUSEError(errno.EISDIR)
457 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
458 raise llfuse.FUSEError(errno.EPERM)
460 fh = next(self._filehandles_counter)
461 self._filehandles[fh] = FileHandle(fh, p)
466 def read(self, fh, off, size):
467 _logger.debug("arv-mount read %i %i %i", fh, off, size)
468 if fh in self._filehandles:
469 handle = self._filehandles[fh]
471 raise llfuse.FUSEError(errno.EBADF)
473 self.inodes.touch(handle.obj)
475 return handle.obj.readfrom(off, size, self.num_retries)
478 def write(self, fh, off, buf):
479 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
480 if fh in self._filehandles:
481 handle = self._filehandles[fh]
483 raise llfuse.FUSEError(errno.EBADF)
485 if not handle.obj.writable():
486 raise llfuse.FUSEError(errno.EPERM)
488 self.inodes.touch(handle.obj)
490 return handle.obj.writeto(off, buf, self.num_retries)
493 def release(self, fh):
494 if fh in self._filehandles:
496 self._filehandles[fh].flush()
500 self._filehandles[fh].release()
501 del self._filehandles[fh]
502 self.inodes.inode_cache.cap_cache()
504 def releasedir(self, fh):
508 def opendir(self, inode):
509 _logger.debug("arv-mount opendir: inode %i", inode)
511 if inode in self.inodes:
512 p = self.inodes[inode]
514 raise llfuse.FUSEError(errno.ENOENT)
516 if not isinstance(p, Directory):
517 raise llfuse.FUSEError(errno.ENOTDIR)
519 fh = next(self._filehandles_counter)
520 if p.parent_inode in self.inodes:
521 parent = self.inodes[p.parent_inode]
523 raise llfuse.FUSEError(errno.EIO)
528 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
532 def readdir(self, fh, off):
533 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
535 if fh in self._filehandles:
536 handle = self._filehandles[fh]
538 raise llfuse.FUSEError(errno.EBADF)
540 _logger.debug("arv-mount handle.dirobj %s", handle.obj)
543 while e < len(handle.entries):
544 if handle.entries[e][1].inode in self.inodes:
545 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
550 st = llfuse.StatvfsData()
551 st.f_bsize = 128 * 1024
564 def _check_writable(self, inode_parent):
565 if not self.enable_write:
566 raise llfuse.FUSEError(errno.EROFS)
568 if inode_parent in self.inodes:
569 p = self.inodes[inode_parent]
571 raise llfuse.FUSEError(errno.ENOENT)
573 if not isinstance(p, Directory):
574 raise llfuse.FUSEError(errno.ENOTDIR)
577 raise llfuse.FUSEError(errno.EPERM)
582 def create(self, inode_parent, name, mode, flags, ctx):
583 _logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
585 p = self._check_writable(inode_parent)
588 # The file entry should have been implicitly created by callback.
590 fh = next(self._filehandles_counter)
591 self._filehandles[fh] = FileHandle(fh, f)
595 return (fh, self.getattr(f.inode))
598 def mkdir(self, inode_parent, name, mode, ctx):
599 _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
601 p = self._check_writable(inode_parent)
604 # The dir entry should have been implicitly created by callback.
608 return self.getattr(d.inode)
611 def unlink(self, inode_parent, name):
612 _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
613 p = self._check_writable(inode_parent)
617 def rmdir(self, inode_parent, name):
618 _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
619 p = self._check_writable(inode_parent)
623 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
624 _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
625 src = self._check_writable(inode_parent_old)
626 dest = self._check_writable(inode_parent_new)
627 dest.rename(name_old, name_new, src)
631 if fh in self._filehandles:
632 self._filehandles[fh].flush()
634 def fsync(self, fh, datasync):
637 def fsyncdir(self, fh, datasync):