1 """FUSE driver for Arvados Keep
5 There is one `Operations` object per mount point. It is the entry point for all
6 read and write requests from the llfuse module.
8 The operations object owns an `Inodes` object. The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
12 The `Inodes` object owns an `InodeCache` object. The inode cache records the
13 memory footprint of file system objects and when they are last used. When the
14 cache limit is exceeded, the least recently used objects are cleared.
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
18 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
21 Directory objects inherit from `fusedir.Directory`. The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`. These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
28 The general FUSE operation flow is as follows:
30 - The request handler is called with either an inode or file handle that is the
31 subject of the operation.
33 - Look up the inode using the Inodes table or the file handle in the
34 filehandles table to get the file system object.
36 - For methods that alter files or directories, check that the operation is
37 valid and permitted using _check_writable().
39 - Call the relevant method on the file system object.
43 The FUSE driver supports the Arvados event bus. When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
72 # Default _notify_queue has a limit of 1000 items, but it really needs to be
73 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
76 llfuse.capi._notify_queue = Queue.Queue()
78 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
79 from fusefile import StringFile, FuseArvadosFile
81 _logger = logging.getLogger('arvados.arvados_fuse')
83 # Uncomment this to enable llfuse debug logging.
84 # log_handler = logging.StreamHandler()
85 # llogger = logging.getLogger('llfuse')
86 # llogger.addHandler(log_handler)
87 # llogger.setLevel(logging.DEBUG)
90 """Connects a numeric file handle to a File or Directory object that has
91 been opened by the client."""
93 def __init__(self, fh, obj):
102 if self.obj.writable():
103 return self.obj.flush()
106 class FileHandle(Handle):
107 """Connects a numeric file handle to a File object that has
108 been opened by the client."""
112 class DirectoryHandle(Handle):
113 """Connects a numeric file handle to a Directory object that has
114 been opened by the client."""
116 def __init__(self, fh, dirobj, entries):
117 super(DirectoryHandle, self).__init__(fh, dirobj)
118 self.entries = entries
121 class InodeCache(object):
122 """Records the memory footprint of objects and when they are last used.
124 When the cache limit is exceeded, the least recently used objects are
125 cleared. Clearing the object means discarding its contents to release
126 memory. The next time the object is accessed, it must be re-fetched from
127 the server. Note that the inode cache limit is a soft limit; the cache
128 limit may be exceeded if necessary to load very large objects, it may also
129 be exceeded if open file handles prevent objects from being cleared.
133 def __init__(self, cap, min_entries=4):
134 self._entries = collections.OrderedDict()
136 self._counter = itertools.count(0)
139 self.min_entries = min_entries
144 def _remove(self, obj, clear):
145 if clear and not obj.clear():
146 _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
148 self._total -= obj.cache_size
149 del self._entries[obj.cache_priority]
151 del self._by_uuid[obj.cache_uuid]
152 obj.cache_uuid = None
154 _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
158 if self._total > self.cap:
159 for key in list(self._entries.keys()):
160 if self._total < self.cap or len(self._entries) < self.min_entries:
162 self._remove(self._entries[key], True)
164 def manage(self, obj):
166 obj.cache_priority = next(self._counter)
167 obj.cache_size = obj.objsize()
168 self._entries[obj.cache_priority] = obj
169 obj.cache_uuid = obj.uuid()
171 self._by_uuid[obj.cache_uuid] = obj
172 self._total += obj.objsize()
173 _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
176 obj.cache_priority = None
178 def touch(self, obj):
180 if obj.cache_priority in self._entries:
181 self._remove(obj, False)
184 def unmanage(self, obj):
185 if obj.persisted() and obj.cache_priority in self._entries:
186 self._remove(obj, True)
188 def find(self, uuid):
189 return self._by_uuid.get(uuid)
191 class Inodes(object):
192 """Manage the set of inodes. This is the mapping from a numeric id
193 to a concrete File or Directory object"""
195 def __init__(self, inode_cache, encoding="utf-8"):
197 self._counter = itertools.count(llfuse.ROOT_INODE)
198 self.inode_cache = inode_cache
199 self.encoding = encoding
200 self.deferred_invalidations = []
202 def __getitem__(self, item):
203 return self._entries[item]
205 def __setitem__(self, key, item):
206 self._entries[key] = item
209 return self._entries.iterkeys()
212 return self._entries.items()
214 def __contains__(self, k):
215 return k in self._entries
217 def touch(self, entry):
218 entry._atime = time.time()
219 self.inode_cache.touch(entry)
221 def add_entry(self, entry):
222 entry.inode = next(self._counter)
223 if entry.inode == llfuse.ROOT_INODE:
225 self._entries[entry.inode] = entry
226 self.inode_cache.manage(entry)
229 def del_entry(self, entry):
230 if entry.ref_count == 0:
231 self.inode_cache.unmanage(entry)
232 del self._entries[entry.inode]
233 with llfuse.lock_released:
235 self.invalidate_inode(entry.inode)
239 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
241 def invalidate_inode(self, inode):
242 llfuse.invalidate_inode(inode)
244 def invalidate_entry(self, inode, name):
245 llfuse.invalidate_entry(inode, name)
248 def catch_exceptions(orig_func):
249 """Catch uncaught exceptions and log them consistently."""
251 @functools.wraps(orig_func)
252 def catch_exceptions_wrapper(self, *args, **kwargs):
254 return orig_func(self, *args, **kwargs)
255 except llfuse.FUSEError:
257 except EnvironmentError as e:
258 raise llfuse.FUSEError(e.errno)
259 except arvados.errors.KeepWriteError as e:
260 _logger.error("Keep write error: " + str(e))
261 raise llfuse.FUSEError(errno.EIO)
262 except arvados.errors.NotFoundError as e:
263 _logger.error("Block not found error: " + str(e))
264 raise llfuse.FUSEError(errno.EIO)
266 _logger.exception("Unhandled exception during FUSE operation")
267 raise llfuse.FUSEError(errno.EIO)
269 return catch_exceptions_wrapper
272 class Operations(llfuse.Operations):
273 """This is the main interface with llfuse.
275 The methods on this object are called by llfuse threads to service FUSE
276 events to query and read from the file system.
278 llfuse has its own global lock which is acquired before calling a request handler,
279 so request handlers do not run concurrently unless the lock is explicitly released
280 using 'with llfuse.lock_released:'
284 def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
285 super(Operations, self).__init__()
288 inode_cache = InodeCache(cap=256*1024*1024)
289 self.inodes = Inodes(inode_cache, encoding=encoding)
292 self.enable_write = enable_write
294 # dict of inode to filehandle
295 self._filehandles = {}
296 self._filehandles_counter = itertools.count(0)
298 # Other threads that need to wait until the fuse driver
299 # is fully initialized should wait() on this event object.
300 self.initlock = threading.Event()
302 self.num_retries = num_retries
307 # Allow threads that are waiting for the driver to be finished
308 # initializing to continue
317 for k,v in self.inodes.items():
320 except Exception as e:
321 _logger.exception("Error during finalize of inode %i", k)
324 def access(self, inode, mode, ctx):
327 def listen_for_events(self, api_client):
328 self.events = arvados.events.subscribe(api_client,
329 [["event_type", "in", ["create", "update", "delete"]]],
333 def on_event(self, ev):
334 if 'event_type' in ev:
336 item = self.inodes.inode_cache.find(ev["object_uuid"])
339 if ev["object_kind"] == "arvados#collection":
340 new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
342 # new_attributes.modified_at currently lacks subsecond precision (see #6347) so use event_at which
343 # should always be the same.
344 #record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
345 record_version = (ev["event_at"], new_attr["portable_data_hash"]) if new_attr else None
347 item.update(to_record_version=record_version)
351 oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid")
352 olditemparent = self.inodes.inode_cache.find(oldowner)
353 if olditemparent is not None:
354 olditemparent.invalidate()
355 olditemparent.update()
357 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
358 if itemparent is not None:
359 itemparent.invalidate()
364 def getattr(self, inode):
365 if inode not in self.inodes:
366 raise llfuse.FUSEError(errno.ENOENT)
368 e = self.inodes[inode]
370 entry = llfuse.EntryAttributes()
373 entry.entry_timeout = 60
374 entry.attr_timeout = 60
376 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
377 if isinstance(e, Directory):
378 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
380 entry.st_mode |= stat.S_IFREG
381 if isinstance(e, FuseArvadosFile):
382 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
384 if self.enable_write and e.writable():
385 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
388 entry.st_uid = self.uid
389 entry.st_gid = self.gid
392 entry.st_size = e.size()
394 entry.st_blksize = 512
395 entry.st_blocks = (entry.st_size/512)+1
396 entry.st_atime = int(e.atime())
397 entry.st_mtime = int(e.mtime())
398 entry.st_ctime = int(e.mtime())
403 def setattr(self, inode, attr):
404 entry = self.getattr(inode)
406 e = self.inodes[inode]
408 if attr.st_size is not None and isinstance(e, FuseArvadosFile):
409 with llfuse.lock_released:
410 e.arvfile.truncate(attr.st_size)
411 entry.st_size = e.arvfile.size()
416 def lookup(self, parent_inode, name):
417 name = unicode(name, self.inodes.encoding)
423 if parent_inode in self.inodes:
424 p = self.inodes[parent_inode]
426 inode = p.parent_inode
427 elif isinstance(p, Directory) and name in p:
428 inode = p[name].inode
431 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
432 parent_inode, name, inode)
433 self.inodes[inode].inc_ref()
434 return self.getattr(inode)
436 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
438 raise llfuse.FUSEError(errno.ENOENT)
441 def forget(self, inodes):
442 for inode, nlookup in inodes:
443 ent = self.inodes[inode]
444 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
445 if ent.dec_ref(nlookup) == 0 and ent.dead:
446 self.inodes.del_entry(ent)
449 def open(self, inode, flags):
450 if inode in self.inodes:
451 p = self.inodes[inode]
453 raise llfuse.FUSEError(errno.ENOENT)
455 if isinstance(p, Directory):
456 raise llfuse.FUSEError(errno.EISDIR)
458 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
459 raise llfuse.FUSEError(errno.EPERM)
461 fh = next(self._filehandles_counter)
462 self._filehandles[fh] = FileHandle(fh, p)
467 def read(self, fh, off, size):
468 _logger.debug("arv-mount read %i %i %i", fh, off, size)
469 if fh in self._filehandles:
470 handle = self._filehandles[fh]
472 raise llfuse.FUSEError(errno.EBADF)
474 self.inodes.touch(handle.obj)
476 return handle.obj.readfrom(off, size, self.num_retries)
479 def write(self, fh, off, buf):
480 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
481 if fh in self._filehandles:
482 handle = self._filehandles[fh]
484 raise llfuse.FUSEError(errno.EBADF)
486 if not handle.obj.writable():
487 raise llfuse.FUSEError(errno.EPERM)
489 self.inodes.touch(handle.obj)
491 return handle.obj.writeto(off, buf, self.num_retries)
494 def release(self, fh):
495 if fh in self._filehandles:
497 self._filehandles[fh].flush()
501 self._filehandles[fh].release()
502 del self._filehandles[fh]
503 self.inodes.inode_cache.cap_cache()
505 def releasedir(self, fh):
509 def opendir(self, inode):
510 _logger.debug("arv-mount opendir: inode %i", inode)
512 if inode in self.inodes:
513 p = self.inodes[inode]
515 raise llfuse.FUSEError(errno.ENOENT)
517 if not isinstance(p, Directory):
518 raise llfuse.FUSEError(errno.ENOTDIR)
520 fh = next(self._filehandles_counter)
521 if p.parent_inode in self.inodes:
522 parent = self.inodes[p.parent_inode]
524 raise llfuse.FUSEError(errno.EIO)
529 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
533 def readdir(self, fh, off):
534 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
536 if fh in self._filehandles:
537 handle = self._filehandles[fh]
539 raise llfuse.FUSEError(errno.EBADF)
541 _logger.debug("arv-mount handle.dirobj %s", handle.obj)
544 while e < len(handle.entries):
545 if handle.entries[e][1].inode in self.inodes:
546 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
551 st = llfuse.StatvfsData()
552 st.f_bsize = 128 * 1024
565 def _check_writable(self, inode_parent):
566 if not self.enable_write:
567 raise llfuse.FUSEError(errno.EROFS)
569 if inode_parent in self.inodes:
570 p = self.inodes[inode_parent]
572 raise llfuse.FUSEError(errno.ENOENT)
574 if not isinstance(p, Directory):
575 raise llfuse.FUSEError(errno.ENOTDIR)
578 raise llfuse.FUSEError(errno.EPERM)
583 def create(self, inode_parent, name, mode, flags, ctx):
584 _logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
586 p = self._check_writable(inode_parent)
589 # The file entry should have been implicitly created by callback.
591 fh = next(self._filehandles_counter)
592 self._filehandles[fh] = FileHandle(fh, f)
596 return (fh, self.getattr(f.inode))
599 def mkdir(self, inode_parent, name, mode, ctx):
600 _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
602 p = self._check_writable(inode_parent)
605 # The dir entry should have been implicitly created by callback.
609 return self.getattr(d.inode)
612 def unlink(self, inode_parent, name):
613 _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
614 p = self._check_writable(inode_parent)
618 def rmdir(self, inode_parent, name):
619 _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
620 p = self._check_writable(inode_parent)
624 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
625 _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
626 src = self._check_writable(inode_parent_old)
627 dest = self._check_writable(inode_parent_new)
628 dest.rename(name_old, name_new, src)
632 if fh in self._filehandles:
633 self._filehandles[fh].flush()
635 def fsync(self, fh, datasync):
638 def fsyncdir(self, fh, datasync):