1 """FUSE driver for Arvados Keep
5 There is one `Operations` object per mount point. It is the entry point for all
6 read and write requests from the llfuse module.
8 The operations object owns an `Inodes` object. The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
12 The `Inodes` object owns an `InodeCache` object. The inode cache records the
13 memory footprint of file system objects and when they are last used. When the
14 cache limit is exceeded, the least recently used objects are cleared.
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
18 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
21 Directory objects inherit from `fusedir.Directory`. The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`. These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
28 The general FUSE operation flow is as follows:
30 - The request handler is called with either an inode or file handle that is the
31 subject of the operation.
33 - Look up the inode using the Inodes table or the file handle in the
34 filehandles table to get the file system object.
36 - For methods that alter files or directories, check that the operation is
37 valid and permitted using _check_writable().
39 - Call the relevant method on the file system object.
43 The FUSE driver supports the Arvados event bus. When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
72 # Default _notify_queue has a limit of 1000 items, but it really needs to be
73 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
76 llfuse.capi._notify_queue = Queue.Queue()
78 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
79 from fusefile import StringFile, FuseArvadosFile
81 _logger = logging.getLogger('arvados.arvados_fuse')
83 # Uncomment this to enable llfuse debug logging.
84 # log_handler = logging.StreamHandler()
85 # llogger = logging.getLogger('llfuse')
86 # llogger.addHandler(log_handler)
87 # llogger.setLevel(logging.DEBUG)
90 """Connects a numeric file handle to a File or Directory object that has
91 been opened by the client."""
93 def __init__(self, fh, obj):
102 if self.obj.writable():
103 return self.obj.flush()
106 class FileHandle(Handle):
107 """Connects a numeric file handle to a File object that has
108 been opened by the client."""
112 class DirectoryHandle(Handle):
113 """Connects a numeric file handle to a Directory object that has
114 been opened by the client."""
116 def __init__(self, fh, dirobj, entries):
117 super(DirectoryHandle, self).__init__(fh, dirobj)
118 self.entries = entries
121 class InodeCache(object):
122 """Records the memory footprint of objects and when they are last used.
124 When the cache limit is exceeded, the least recently used objects are
125 cleared. Clearing the object means discarding its contents to release
126 memory. The next time the object is accessed, it must be re-fetched from
127 the server. Note that the inode cache limit is a soft limit; the cache
128 limit may be exceeded if necessary to load very large objects, it may also
129 be exceeded if open file handles prevent objects from being cleared.
133 def __init__(self, cap, min_entries=4):
134 self._entries = collections.OrderedDict()
136 self._counter = itertools.count(0)
139 self.min_entries = min_entries
144 def _remove(self, obj, clear):
145 if clear and not obj.clear():
146 _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
148 self._total -= obj.cache_size
149 del self._entries[obj.cache_priority]
151 self._by_uuid[obj.cache_uuid].remove(obj)
152 if not self._by_uuid[obj.cache_uuid]:
153 del self._by_uuid[obj.cache_uuid]
154 obj.cache_uuid = None
156 _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
160 if self._total > self.cap:
161 for key in list(self._entries.keys()):
162 if self._total < self.cap or len(self._entries) < self.min_entries:
164 self._remove(self._entries[key], True)
166 def manage(self, obj):
168 obj.cache_priority = next(self._counter)
169 obj.cache_size = obj.objsize()
170 self._entries[obj.cache_priority] = obj
171 obj.cache_uuid = obj.uuid()
173 if obj.cache_uuid not in self._by_uuid:
174 self._by_uuid[obj.cache_uuid] = [obj]
176 if obj not in self._by_uuid[obj.cache_uuid]:
177 self._by_uuid[obj.cache_uuid].append(obj)
178 self._total += obj.objsize()
179 _logger.debug("InodeCache touched %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
182 obj.cache_priority = None
184 def touch(self, obj):
186 if obj.cache_priority in self._entries:
187 self._remove(obj, False)
190 def unmanage(self, obj):
191 if obj.persisted() and obj.cache_priority in self._entries:
192 self._remove(obj, True)
194 def find(self, uuid):
195 return self._by_uuid.get(uuid)
198 self._entries.clear()
199 self._by_uuid.clear()
202 class Inodes(object):
203 """Manage the set of inodes. This is the mapping from a numeric id
204 to a concrete File or Directory object"""
206 def __init__(self, inode_cache, encoding="utf-8"):
208 self._counter = itertools.count(llfuse.ROOT_INODE)
209 self.inode_cache = inode_cache
210 self.encoding = encoding
211 self.deferred_invalidations = []
213 def __getitem__(self, item):
214 return self._entries[item]
216 def __setitem__(self, key, item):
217 self._entries[key] = item
220 return self._entries.iterkeys()
223 return self._entries.items()
225 def __contains__(self, k):
226 return k in self._entries
228 def touch(self, entry):
229 entry._atime = time.time()
230 self.inode_cache.touch(entry)
232 def add_entry(self, entry):
233 entry.inode = next(self._counter)
234 if entry.inode == llfuse.ROOT_INODE:
236 self._entries[entry.inode] = entry
237 self.inode_cache.manage(entry)
240 def del_entry(self, entry):
241 if entry.ref_count == 0:
242 self.inode_cache.unmanage(entry)
243 del self._entries[entry.inode]
244 with llfuse.lock_released:
246 self.invalidate_inode(entry.inode)
250 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
252 def invalidate_inode(self, inode):
253 llfuse.invalidate_inode(inode)
255 def invalidate_entry(self, inode, name):
256 llfuse.invalidate_entry(inode, name)
259 self.inode_cache.clear()
261 for k,v in self._entries.items():
264 except Exception as e:
265 _logger.exception("Error during finalize of inode %i", k)
267 self._entries.clear()
270 def catch_exceptions(orig_func):
271 """Catch uncaught exceptions and log them consistently."""
273 @functools.wraps(orig_func)
274 def catch_exceptions_wrapper(self, *args, **kwargs):
276 return orig_func(self, *args, **kwargs)
277 except llfuse.FUSEError:
279 except EnvironmentError as e:
280 raise llfuse.FUSEError(e.errno)
281 except arvados.errors.KeepWriteError as e:
282 _logger.error("Keep write error: " + str(e))
283 raise llfuse.FUSEError(errno.EIO)
284 except arvados.errors.NotFoundError as e:
285 _logger.error("Block not found error: " + str(e))
286 raise llfuse.FUSEError(errno.EIO)
288 _logger.exception("Unhandled exception during FUSE operation")
289 raise llfuse.FUSEError(errno.EIO)
291 return catch_exceptions_wrapper
294 class Operations(llfuse.Operations):
295 """This is the main interface with llfuse.
297 The methods on this object are called by llfuse threads to service FUSE
298 events to query and read from the file system.
300 llfuse has its own global lock which is acquired before calling a request handler,
301 so request handlers do not run concurrently unless the lock is explicitly released
302 using 'with llfuse.lock_released:'
306 def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
307 super(Operations, self).__init__()
310 inode_cache = InodeCache(cap=256*1024*1024)
311 self.inodes = Inodes(inode_cache, encoding=encoding)
314 self.enable_write = enable_write
316 # dict of inode to filehandle
317 self._filehandles = {}
318 self._filehandles_counter = itertools.count(0)
320 # Other threads that need to wait until the fuse driver
321 # is fully initialized should wait() on this event object.
322 self.initlock = threading.Event()
324 self.num_retries = num_retries
329 # Allow threads that are waiting for the driver to be finished
330 # initializing to continue
341 def access(self, inode, mode, ctx):
344 def listen_for_events(self, api_client):
345 self.events = arvados.events.subscribe(api_client,
346 [["event_type", "in", ["create", "update", "delete"]]],
350 def on_event(self, ev):
351 if 'event_type' in ev:
353 item = self.inodes.inode_cache.find(ev["object_uuid"])
356 if ev["object_kind"] == "arvados#collection":
357 new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
359 # new_attributes.modified_at currently lacks subsecond precision (see #6347) so use event_at which
360 # should always be the same.
361 #record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
362 record_version = (ev["event_at"], new_attr["portable_data_hash"]) if new_attr else None
364 item.update(to_record_version=record_version)
368 oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid")
369 olditemparent = self.inodes.inode_cache.find(oldowner)
370 if olditemparent is not None:
371 olditemparent.invalidate()
372 olditemparent.update()
374 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
375 if itemparent is not None:
376 itemparent.invalidate()
381 def getattr(self, inode):
382 if inode not in self.inodes:
383 raise llfuse.FUSEError(errno.ENOENT)
385 e = self.inodes[inode]
387 entry = llfuse.EntryAttributes()
390 entry.entry_timeout = 60
391 entry.attr_timeout = 60
393 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
394 if isinstance(e, Directory):
395 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
397 entry.st_mode |= stat.S_IFREG
398 if isinstance(e, FuseArvadosFile):
399 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
401 if self.enable_write and e.writable():
402 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
405 entry.st_uid = self.uid
406 entry.st_gid = self.gid
409 entry.st_size = e.size()
411 entry.st_blksize = 512
412 entry.st_blocks = (entry.st_size/512)+1
413 entry.st_atime = int(e.atime())
414 entry.st_mtime = int(e.mtime())
415 entry.st_ctime = int(e.mtime())
420 def setattr(self, inode, attr):
421 entry = self.getattr(inode)
423 e = self.inodes[inode]
425 if attr.st_size is not None and isinstance(e, FuseArvadosFile):
426 with llfuse.lock_released:
427 e.arvfile.truncate(attr.st_size)
428 entry.st_size = e.arvfile.size()
433 def lookup(self, parent_inode, name):
434 name = unicode(name, self.inodes.encoding)
440 if parent_inode in self.inodes:
441 p = self.inodes[parent_inode]
443 inode = p.parent_inode
444 elif isinstance(p, Directory) and name in p:
445 inode = p[name].inode
448 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
449 parent_inode, name, inode)
450 self.inodes[inode].inc_ref()
451 return self.getattr(inode)
453 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
455 raise llfuse.FUSEError(errno.ENOENT)
458 def forget(self, inodes):
459 for inode, nlookup in inodes:
460 ent = self.inodes[inode]
461 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
462 if ent.dec_ref(nlookup) == 0 and ent.dead:
463 self.inodes.del_entry(ent)
466 def open(self, inode, flags):
467 if inode in self.inodes:
468 p = self.inodes[inode]
470 raise llfuse.FUSEError(errno.ENOENT)
472 if isinstance(p, Directory):
473 raise llfuse.FUSEError(errno.EISDIR)
475 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
476 raise llfuse.FUSEError(errno.EPERM)
478 fh = next(self._filehandles_counter)
479 self._filehandles[fh] = FileHandle(fh, p)
484 def read(self, fh, off, size):
485 _logger.debug("arv-mount read %i %i %i", fh, off, size)
486 if fh in self._filehandles:
487 handle = self._filehandles[fh]
489 raise llfuse.FUSEError(errno.EBADF)
491 self.inodes.touch(handle.obj)
493 return handle.obj.readfrom(off, size, self.num_retries)
496 def write(self, fh, off, buf):
497 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
498 if fh in self._filehandles:
499 handle = self._filehandles[fh]
501 raise llfuse.FUSEError(errno.EBADF)
503 if not handle.obj.writable():
504 raise llfuse.FUSEError(errno.EPERM)
506 self.inodes.touch(handle.obj)
508 return handle.obj.writeto(off, buf, self.num_retries)
511 def release(self, fh):
512 if fh in self._filehandles:
514 self._filehandles[fh].flush()
518 self._filehandles[fh].release()
519 del self._filehandles[fh]
520 self.inodes.inode_cache.cap_cache()
522 def releasedir(self, fh):
526 def opendir(self, inode):
527 _logger.debug("arv-mount opendir: inode %i", inode)
529 if inode in self.inodes:
530 p = self.inodes[inode]
532 raise llfuse.FUSEError(errno.ENOENT)
534 if not isinstance(p, Directory):
535 raise llfuse.FUSEError(errno.ENOTDIR)
537 fh = next(self._filehandles_counter)
538 if p.parent_inode in self.inodes:
539 parent = self.inodes[p.parent_inode]
541 raise llfuse.FUSEError(errno.EIO)
546 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
550 def readdir(self, fh, off):
551 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
553 if fh in self._filehandles:
554 handle = self._filehandles[fh]
556 raise llfuse.FUSEError(errno.EBADF)
558 _logger.debug("arv-mount handle.dirobj %s", handle.obj)
561 while e < len(handle.entries):
562 if handle.entries[e][1].inode in self.inodes:
563 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
568 st = llfuse.StatvfsData()
569 st.f_bsize = 128 * 1024
582 def _check_writable(self, inode_parent):
583 if not self.enable_write:
584 raise llfuse.FUSEError(errno.EROFS)
586 if inode_parent in self.inodes:
587 p = self.inodes[inode_parent]
589 raise llfuse.FUSEError(errno.ENOENT)
591 if not isinstance(p, Directory):
592 raise llfuse.FUSEError(errno.ENOTDIR)
595 raise llfuse.FUSEError(errno.EPERM)
600 def create(self, inode_parent, name, mode, flags, ctx):
601 _logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
603 p = self._check_writable(inode_parent)
606 # The file entry should have been implicitly created by callback.
608 fh = next(self._filehandles_counter)
609 self._filehandles[fh] = FileHandle(fh, f)
613 return (fh, self.getattr(f.inode))
616 def mkdir(self, inode_parent, name, mode, ctx):
617 _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
619 p = self._check_writable(inode_parent)
622 # The dir entry should have been implicitly created by callback.
626 return self.getattr(d.inode)
629 def unlink(self, inode_parent, name):
630 _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
631 p = self._check_writable(inode_parent)
635 def rmdir(self, inode_parent, name):
636 _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
637 p = self._check_writable(inode_parent)
641 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
642 _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
643 src = self._check_writable(inode_parent_old)
644 dest = self._check_writable(inode_parent_new)
645 dest.rename(name_old, name_new, src)
649 if fh in self._filehandles:
650 self._filehandles[fh].flush()
652 def fsync(self, fh, datasync):
655 def fsyncdir(self, fh, datasync):