1 """FUSE driver for Arvados Keep
5 There is one `Operations` object per mount point. It is the entry point for all
6 read and write requests from the llfuse module.
8 The operations object owns an `Inodes` object. The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
12 The `Inodes` object owns an `InodeCache` object. The inode cache records the
13 memory footprint of file system objects and when they are last used. When the
14 cache limit is exceeded, the least recently used objects are cleared.
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
18 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
21 Directory objects inherit from `fusedir.Directory`. The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`. These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
28 The general FUSE operation flow is as follows:
30 - The request handler is called with either an inode or file handle that is the
31 subject of the operation.
33 - Look up the inode using the Inodes table or the file handle in the
34 filehandles table to get the file system object.
36 - For methods that alter files or directories, check that the operation is
37 valid and permitted using _check_writable().
39 - Call the relevant method on the file system object.
43 The FUSE driver supports the Arvados event bus. When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
72 # Default _notify_queue has a limit of 1000 items, but it really needs to be
73 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
76 llfuse.capi._notify_queue = Queue.Queue()
78 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
79 from fusefile import StringFile, FuseArvadosFile
81 _logger = logging.getLogger('arvados.arvados_fuse')
83 # Uncomment this to enable llfuse debug logging.
84 # log_handler = logging.StreamHandler()
85 # llogger = logging.getLogger('llfuse')
86 # llogger.addHandler(log_handler)
87 # llogger.setLevel(logging.DEBUG)
90 """Connects a numeric file handle to a File or Directory object that has
91 been opened by the client."""
93 def __init__(self, fh, obj):
102 if self.obj.writable():
103 return self.obj.flush()
106 class FileHandle(Handle):
107 """Connects a numeric file handle to a File object that has
108 been opened by the client."""
112 class DirectoryHandle(Handle):
113 """Connects a numeric file handle to a Directory object that has
114 been opened by the client."""
116 def __init__(self, fh, dirobj, entries):
117 super(DirectoryHandle, self).__init__(fh, dirobj)
118 self.entries = entries
121 class InodeCache(object):
122 """Records the memory footprint of objects and when they are last used.
124 When the cache limit is exceeded, the least recently used objects are
125 cleared. Clearing the object means discarding its contents to release
126 memory. The next time the object is accessed, it must be re-fetched from
127 the server. Note that the inode cache limit is a soft limit; the cache
128 limit may be exceeded if necessary to load very large objects, it may also
129 be exceeded if open file handles prevent objects from being cleared.
133 def __init__(self, cap, min_entries=4):
134 self._entries = collections.OrderedDict()
136 self._counter = itertools.count(0)
139 self.min_entries = min_entries
144 def _remove(self, obj, clear):
145 if clear and not obj.clear():
146 _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
148 self._total -= obj.cache_size
149 del self._entries[obj.cache_priority]
151 del self._by_uuid[obj.cache_uuid]
152 obj.cache_uuid = None
154 _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
158 if self._total > self.cap:
159 for key in list(self._entries.keys()):
160 if self._total < self.cap or len(self._entries) < self.min_entries:
162 self._remove(self._entries[key], True)
164 def manage(self, obj):
166 obj.cache_priority = next(self._counter)
167 obj.cache_size = obj.objsize()
168 self._entries[obj.cache_priority] = obj
169 obj.cache_uuid = obj.uuid()
171 self._by_uuid[obj.cache_uuid] = obj
172 self._total += obj.objsize()
173 _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
176 obj.cache_priority = None
178 def touch(self, obj):
180 if obj.cache_priority in self._entries:
181 self._remove(obj, False)
184 def unmanage(self, obj):
185 if obj.persisted() and obj.cache_priority in self._entries:
186 self._remove(obj, True)
188 def find(self, uuid):
189 return self._by_uuid.get(uuid)
192 self._entries.clear()
193 self._by_uuid.clear()
196 class Inodes(object):
197 """Manage the set of inodes. This is the mapping from a numeric id
198 to a concrete File or Directory object"""
200 def __init__(self, inode_cache, encoding="utf-8"):
202 self._counter = itertools.count(llfuse.ROOT_INODE)
203 self.inode_cache = inode_cache
204 self.encoding = encoding
205 self.deferred_invalidations = []
207 def __getitem__(self, item):
208 return self._entries[item]
210 def __setitem__(self, key, item):
211 self._entries[key] = item
214 return self._entries.iterkeys()
217 return self._entries.items()
219 def __contains__(self, k):
220 return k in self._entries
222 def touch(self, entry):
223 entry._atime = time.time()
224 self.inode_cache.touch(entry)
226 def add_entry(self, entry):
227 entry.inode = next(self._counter)
228 if entry.inode == llfuse.ROOT_INODE:
230 self._entries[entry.inode] = entry
231 self.inode_cache.manage(entry)
234 def del_entry(self, entry):
235 if entry.ref_count == 0:
236 self.inode_cache.unmanage(entry)
237 del self._entries[entry.inode]
238 with llfuse.lock_released:
240 self.invalidate_inode(entry.inode)
244 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
246 def invalidate_inode(self, inode):
247 llfuse.invalidate_inode(inode)
249 def invalidate_entry(self, inode, name):
250 llfuse.invalidate_entry(inode, name)
253 self.inode_cache.clear()
255 for k,v in self._entries.items():
258 except Exception as e:
259 _logger.exception("Error during finalize of inode %i", k)
261 self._entries.clear()
264 def catch_exceptions(orig_func):
265 """Catch uncaught exceptions and log them consistently."""
267 @functools.wraps(orig_func)
268 def catch_exceptions_wrapper(self, *args, **kwargs):
270 return orig_func(self, *args, **kwargs)
271 except llfuse.FUSEError:
273 except EnvironmentError as e:
274 raise llfuse.FUSEError(e.errno)
275 except arvados.errors.KeepWriteError as e:
276 _logger.error("Keep write error: " + str(e))
277 raise llfuse.FUSEError(errno.EIO)
278 except arvados.errors.NotFoundError as e:
279 _logger.error("Block not found error: " + str(e))
280 raise llfuse.FUSEError(errno.EIO)
282 _logger.exception("Unhandled exception during FUSE operation")
283 raise llfuse.FUSEError(errno.EIO)
285 return catch_exceptions_wrapper
288 class Operations(llfuse.Operations):
289 """This is the main interface with llfuse.
291 The methods on this object are called by llfuse threads to service FUSE
292 events to query and read from the file system.
294 llfuse has its own global lock which is acquired before calling a request handler,
295 so request handlers do not run concurrently unless the lock is explicitly released
296 using 'with llfuse.lock_released:'
300 def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
301 super(Operations, self).__init__()
304 inode_cache = InodeCache(cap=256*1024*1024)
305 self.inodes = Inodes(inode_cache, encoding=encoding)
308 self.enable_write = enable_write
310 # dict of inode to filehandle
311 self._filehandles = {}
312 self._filehandles_counter = itertools.count(0)
314 # Other threads that need to wait until the fuse driver
315 # is fully initialized should wait() on this event object.
316 self.initlock = threading.Event()
318 self.num_retries = num_retries
323 # Allow threads that are waiting for the driver to be finished
324 # initializing to continue
335 def access(self, inode, mode, ctx):
338 def listen_for_events(self, api_client):
339 self.events = arvados.events.subscribe(api_client,
340 [["event_type", "in", ["create", "update", "delete"]]],
344 def on_event(self, ev):
345 if 'event_type' in ev:
347 item = self.inodes.inode_cache.find(ev["object_uuid"])
350 if ev["object_kind"] == "arvados#collection":
351 new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
353 # new_attributes.modified_at currently lacks subsecond precision (see #6347) so use event_at which
354 # should always be the same.
355 #record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
356 record_version = (ev["event_at"], new_attr["portable_data_hash"]) if new_attr else None
358 item.update(to_record_version=record_version)
362 oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid")
363 olditemparent = self.inodes.inode_cache.find(oldowner)
364 if olditemparent is not None:
365 olditemparent.invalidate()
366 olditemparent.update()
368 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
369 if itemparent is not None:
370 itemparent.invalidate()
375 def getattr(self, inode):
376 if inode not in self.inodes:
377 raise llfuse.FUSEError(errno.ENOENT)
379 e = self.inodes[inode]
381 entry = llfuse.EntryAttributes()
384 entry.entry_timeout = 60
385 entry.attr_timeout = 60
387 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
388 if isinstance(e, Directory):
389 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
391 entry.st_mode |= stat.S_IFREG
392 if isinstance(e, FuseArvadosFile):
393 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
395 if self.enable_write and e.writable():
396 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
399 entry.st_uid = self.uid
400 entry.st_gid = self.gid
403 entry.st_size = e.size()
405 entry.st_blksize = 512
406 entry.st_blocks = (entry.st_size/512)+1
407 entry.st_atime = int(e.atime())
408 entry.st_mtime = int(e.mtime())
409 entry.st_ctime = int(e.mtime())
414 def setattr(self, inode, attr):
415 entry = self.getattr(inode)
417 e = self.inodes[inode]
419 if attr.st_size is not None and isinstance(e, FuseArvadosFile):
420 with llfuse.lock_released:
421 e.arvfile.truncate(attr.st_size)
422 entry.st_size = e.arvfile.size()
427 def lookup(self, parent_inode, name):
428 name = unicode(name, self.inodes.encoding)
434 if parent_inode in self.inodes:
435 p = self.inodes[parent_inode]
437 inode = p.parent_inode
438 elif isinstance(p, Directory) and name in p:
439 inode = p[name].inode
442 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
443 parent_inode, name, inode)
444 self.inodes[inode].inc_ref()
445 return self.getattr(inode)
447 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
449 raise llfuse.FUSEError(errno.ENOENT)
452 def forget(self, inodes):
453 for inode, nlookup in inodes:
454 ent = self.inodes[inode]
455 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
456 if ent.dec_ref(nlookup) == 0 and ent.dead:
457 self.inodes.del_entry(ent)
460 def open(self, inode, flags):
461 if inode in self.inodes:
462 p = self.inodes[inode]
464 raise llfuse.FUSEError(errno.ENOENT)
466 if isinstance(p, Directory):
467 raise llfuse.FUSEError(errno.EISDIR)
469 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
470 raise llfuse.FUSEError(errno.EPERM)
472 fh = next(self._filehandles_counter)
473 self._filehandles[fh] = FileHandle(fh, p)
478 def read(self, fh, off, size):
479 _logger.debug("arv-mount read %i %i %i", fh, off, size)
480 if fh in self._filehandles:
481 handle = self._filehandles[fh]
483 raise llfuse.FUSEError(errno.EBADF)
485 self.inodes.touch(handle.obj)
487 return handle.obj.readfrom(off, size, self.num_retries)
490 def write(self, fh, off, buf):
491 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
492 if fh in self._filehandles:
493 handle = self._filehandles[fh]
495 raise llfuse.FUSEError(errno.EBADF)
497 if not handle.obj.writable():
498 raise llfuse.FUSEError(errno.EPERM)
500 self.inodes.touch(handle.obj)
502 return handle.obj.writeto(off, buf, self.num_retries)
505 def release(self, fh):
506 if fh in self._filehandles:
508 self._filehandles[fh].flush()
512 self._filehandles[fh].release()
513 del self._filehandles[fh]
514 self.inodes.inode_cache.cap_cache()
516 def releasedir(self, fh):
520 def opendir(self, inode):
521 _logger.debug("arv-mount opendir: inode %i", inode)
523 if inode in self.inodes:
524 p = self.inodes[inode]
526 raise llfuse.FUSEError(errno.ENOENT)
528 if not isinstance(p, Directory):
529 raise llfuse.FUSEError(errno.ENOTDIR)
531 fh = next(self._filehandles_counter)
532 if p.parent_inode in self.inodes:
533 parent = self.inodes[p.parent_inode]
535 raise llfuse.FUSEError(errno.EIO)
540 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
544 def readdir(self, fh, off):
545 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
547 if fh in self._filehandles:
548 handle = self._filehandles[fh]
550 raise llfuse.FUSEError(errno.EBADF)
552 _logger.debug("arv-mount handle.dirobj %s", handle.obj)
555 while e < len(handle.entries):
556 if handle.entries[e][1].inode in self.inodes:
557 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
562 st = llfuse.StatvfsData()
563 st.f_bsize = 128 * 1024
576 def _check_writable(self, inode_parent):
577 if not self.enable_write:
578 raise llfuse.FUSEError(errno.EROFS)
580 if inode_parent in self.inodes:
581 p = self.inodes[inode_parent]
583 raise llfuse.FUSEError(errno.ENOENT)
585 if not isinstance(p, Directory):
586 raise llfuse.FUSEError(errno.ENOTDIR)
589 raise llfuse.FUSEError(errno.EPERM)
594 def create(self, inode_parent, name, mode, flags, ctx):
595 _logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
597 p = self._check_writable(inode_parent)
600 # The file entry should have been implicitly created by callback.
602 fh = next(self._filehandles_counter)
603 self._filehandles[fh] = FileHandle(fh, f)
607 return (fh, self.getattr(f.inode))
610 def mkdir(self, inode_parent, name, mode, ctx):
611 _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
613 p = self._check_writable(inode_parent)
616 # The dir entry should have been implicitly created by callback.
620 return self.getattr(d.inode)
623 def unlink(self, inode_parent, name):
624 _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
625 p = self._check_writable(inode_parent)
629 def rmdir(self, inode_parent, name):
630 _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
631 p = self._check_writable(inode_parent)
635 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
636 _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
637 src = self._check_writable(inode_parent_old)
638 dest = self._check_writable(inode_parent_new)
639 dest.rename(name_old, name_new, src)
643 if fh in self._filehandles:
644 self._filehandles[fh].flush()
646 def fsync(self, fh, datasync):
649 def fsyncdir(self, fh, datasync):