1 """FUSE driver for Arvados Keep
5 There is one `Operations` object per mount point. It is the entry point for all
6 read and write requests from the llfuse module.
8 The operations object owns an `Inodes` object. The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
12 The `Inodes` object owns an `InodeCache` object. The inode cache records the
13 memory footprint of file system objects and when they are last used. When the
14 cache limit is exceeded, the least recently used objects are cleared.
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
18 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
21 Directory objects inherit from `fusedir.Directory`. The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`. These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
28 The general FUSE operation flow is as follows:
30 - The request handler is called with either an inode or file handle that is the
31 subject of the operation.
33 - Look up the inode using the Inodes table or the file handle in the
34 filehandles table to get the file system object.
36 - For methods that alter files or directories, check that the operation is
37 valid and permitted using _check_writable().
39 - Call the relevant method on the file system object.
43 The FUSE driver supports the Arvados event bus. When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
73 # Default _notify_queue has a limit of 1000 items, but it really needs to be
74 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
77 llfuse.capi._notify_queue = Queue.Queue()
79 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
80 from fusefile import StringFile, FuseArvadosFile
82 _logger = logging.getLogger('arvados.arvados_fuse')
84 # Uncomment this to enable llfuse debug logging.
85 # log_handler = logging.StreamHandler()
86 # llogger = logging.getLogger('llfuse')
87 # llogger.addHandler(log_handler)
88 # llogger.setLevel(logging.DEBUG)
91 """Connects a numeric file handle to a File or Directory object that has
92 been opened by the client."""
94 def __init__(self, fh, obj):
103 if self.obj.writable():
104 return self.obj.flush()
107 class FileHandle(Handle):
108 """Connects a numeric file handle to a File object that has
109 been opened by the client."""
113 class DirectoryHandle(Handle):
114 """Connects a numeric file handle to a Directory object that has
115 been opened by the client."""
117 def __init__(self, fh, dirobj, entries):
118 super(DirectoryHandle, self).__init__(fh, dirobj)
119 self.entries = entries
122 class InodeCache(object):
123 """Records the memory footprint of objects and when they are last used.
125 When the cache limit is exceeded, the least recently used objects are
126 cleared. Clearing the object means discarding its contents to release
127 memory. The next time the object is accessed, it must be re-fetched from
128 the server. Note that the inode cache limit is a soft limit; the cache
129 limit may be exceeded if necessary to load very large objects, it may also
130 be exceeded if open file handles prevent objects from being cleared.
134 def __init__(self, cap, min_entries=4):
135 self._entries = collections.OrderedDict()
139 self.min_entries = min_entries
144 def _remove(self, obj, clear):
147 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
149 if obj.has_ref(True):
150 obj.kernel_invalidate()
151 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
155 # The llfuse lock is released in del_entry(), which is called by
156 # Directory.clear(). While the llfuse lock is released, it can happen
157 # that a reentrant call removes this entry before this call gets to it.
158 # Ensure that the entry is still valid before trying to remove it.
159 if obj.inode not in self._entries:
162 self._total -= obj.cache_size
163 del self._entries[obj.inode]
165 self._by_uuid[obj.cache_uuid].remove(obj)
166 if not self._by_uuid[obj.cache_uuid]:
167 del self._by_uuid[obj.cache_uuid]
168 obj.cache_uuid = None
170 _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
173 if self._total > self.cap:
174 for ent in self._entries.values():
175 if self._total < self.cap or len(self._entries) < self.min_entries:
177 self._remove(ent, True)
179 def manage(self, obj):
181 obj.cache_size = obj.objsize()
182 self._entries[obj.inode] = obj
183 obj.cache_uuid = obj.uuid()
185 if obj.cache_uuid not in self._by_uuid:
186 self._by_uuid[obj.cache_uuid] = [obj]
188 if obj not in self._by_uuid[obj.cache_uuid]:
189 self._by_uuid[obj.cache_uuid].append(obj)
190 self._total += obj.objsize()
191 _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
194 def touch(self, obj):
196 if obj.inode in self._entries:
197 self._remove(obj, False)
200 def unmanage(self, obj):
201 if obj.persisted() and obj.inode in self._entries:
202 self._remove(obj, True)
204 def find_by_uuid(self, uuid):
205 return self._by_uuid.get(uuid, [])
208 self._entries.clear()
209 self._by_uuid.clear()
212 class Inodes(object):
213 """Manage the set of inodes. This is the mapping from a numeric id
214 to a concrete File or Directory object"""
216 def __init__(self, inode_cache, encoding="utf-8"):
218 self._counter = itertools.count(llfuse.ROOT_INODE)
219 self.inode_cache = inode_cache
220 self.encoding = encoding
221 self.deferred_invalidations = []
223 def __getitem__(self, item):
224 return self._entries[item]
226 def __setitem__(self, key, item):
227 self._entries[key] = item
230 return self._entries.iterkeys()
233 return self._entries.items()
235 def __contains__(self, k):
236 return k in self._entries
238 def touch(self, entry):
239 entry._atime = time.time()
240 self.inode_cache.touch(entry)
242 def add_entry(self, entry):
243 entry.inode = next(self._counter)
244 if entry.inode == llfuse.ROOT_INODE:
246 self._entries[entry.inode] = entry
247 self.inode_cache.manage(entry)
250 def del_entry(self, entry):
251 if entry.ref_count == 0:
252 self.inode_cache.unmanage(entry)
253 del self._entries[entry.inode]
254 with llfuse.lock_released:
256 self.invalidate_inode(entry.inode)
260 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
262 def invalidate_inode(self, inode):
263 llfuse.invalidate_inode(inode)
265 def invalidate_entry(self, inode, name):
266 llfuse.invalidate_entry(inode, name.encode(self.encoding))
269 self.inode_cache.clear()
271 for k,v in self._entries.items():
274 except Exception as e:
275 _logger.exception("Error during finalize of inode %i", k)
277 self._entries.clear()
280 def catch_exceptions(orig_func):
281 """Catch uncaught exceptions and log them consistently."""
283 @functools.wraps(orig_func)
284 def catch_exceptions_wrapper(self, *args, **kwargs):
286 return orig_func(self, *args, **kwargs)
287 except llfuse.FUSEError:
289 except EnvironmentError as e:
290 raise llfuse.FUSEError(e.errno)
291 except arvados.errors.KeepWriteError as e:
292 _logger.error("Keep write error: " + str(e))
293 raise llfuse.FUSEError(errno.EIO)
294 except arvados.errors.NotFoundError as e:
295 _logger.error("Block not found error: " + str(e))
296 raise llfuse.FUSEError(errno.EIO)
298 _logger.exception("Unhandled exception during FUSE operation")
299 raise llfuse.FUSEError(errno.EIO)
301 return catch_exceptions_wrapper
304 class Operations(llfuse.Operations):
305 """This is the main interface with llfuse.
307 The methods on this object are called by llfuse threads to service FUSE
308 events to query and read from the file system.
310 llfuse has its own global lock which is acquired before calling a request handler,
311 so request handlers do not run concurrently unless the lock is explicitly released
312 using 'with llfuse.lock_released:'
316 def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
317 super(Operations, self).__init__()
319 self._api_client = api_client
322 inode_cache = InodeCache(cap=256*1024*1024)
323 self.inodes = Inodes(inode_cache, encoding=encoding)
326 self.enable_write = enable_write
328 # dict of inode to filehandle
329 self._filehandles = {}
330 self._filehandles_counter = itertools.count(0)
332 # Other threads that need to wait until the fuse driver
333 # is fully initialized should wait() on this event object.
334 self.initlock = threading.Event()
336 # If we get overlapping shutdown events (e.g., fusermount -u
337 # -z and operations.destroy()) llfuse calls forget() on inodes
338 # that have already been deleted. To avoid this, we make
339 # forget() a no-op if called after destroy().
340 self._shutdown_started = threading.Event()
342 self.num_retries = num_retries
344 self.read_counter = arvados.keep.Counter()
345 self.write_counter = arvados.keep.Counter()
346 self.read_ops_counter = arvados.keep.Counter()
347 self.write_ops_counter = arvados.keep.Counter()
352 # Allow threads that are waiting for the driver to be finished
353 # initializing to continue
359 self._shutdown_started.set()
366 def access(self, inode, mode, ctx):
369 def listen_for_events(self):
370 self.events = arvados.events.subscribe(self._api_client,
371 [["event_type", "in", ["create", "update", "delete"]]],
375 def on_event(self, ev):
376 if 'event_type' not in ev:
379 for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
381 if ev["object_kind"] == "arvados#collection":
382 new_attr = (ev.get("properties") and
383 ev["properties"].get("new_attributes") and
384 ev["properties"]["new_attributes"])
386 # new_attributes.modified_at currently lacks
387 # subsecond precision (see #6347) so use event_at
388 # which should always be the same.
390 (ev["event_at"], new_attr["portable_data_hash"])
391 if new_attr else None)
393 item.update(to_record_version=record_version)
398 ev.get("properties") and
399 ev["properties"].get("old_attributes") and
400 ev["properties"]["old_attributes"].get("owner_uuid"))
401 newowner = ev["object_owner_uuid"]
403 self.inodes.inode_cache.find_by_uuid(oldowner) +
404 self.inodes.inode_cache.find_by_uuid(newowner)):
410 def getattr(self, inode):
411 if inode not in self.inodes:
412 raise llfuse.FUSEError(errno.ENOENT)
414 e = self.inodes[inode]
416 entry = llfuse.EntryAttributes()
419 entry.entry_timeout = 60 if e.allow_dirent_cache else 0
420 entry.attr_timeout = 60 if e.allow_attr_cache else 0
422 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
423 if isinstance(e, Directory):
424 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
426 entry.st_mode |= stat.S_IFREG
427 if isinstance(e, FuseArvadosFile):
428 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
430 if self.enable_write and e.writable():
431 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
434 entry.st_uid = self.uid
435 entry.st_gid = self.gid
438 entry.st_size = e.size()
440 entry.st_blksize = 512
441 entry.st_blocks = (entry.st_size/512)+1
442 entry.st_atime = int(e.atime())
443 entry.st_mtime = int(e.mtime())
444 entry.st_ctime = int(e.mtime())
449 def setattr(self, inode, attr):
450 entry = self.getattr(inode)
452 e = self.inodes[inode]
454 if attr.st_size is not None and isinstance(e, FuseArvadosFile):
455 with llfuse.lock_released:
456 e.arvfile.truncate(attr.st_size)
457 entry.st_size = e.arvfile.size()
462 def lookup(self, parent_inode, name):
463 name = unicode(name, self.inodes.encoding)
469 if parent_inode in self.inodes:
470 p = self.inodes[parent_inode]
473 inode = p.parent_inode
474 elif isinstance(p, Directory) and name in p:
475 inode = p[name].inode
478 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
479 parent_inode, name, inode)
480 self.inodes[inode].inc_ref()
481 return self.getattr(inode)
483 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
485 raise llfuse.FUSEError(errno.ENOENT)
488 def forget(self, inodes):
489 if self._shutdown_started.is_set():
491 for inode, nlookup in inodes:
492 ent = self.inodes[inode]
493 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
494 if ent.dec_ref(nlookup) == 0 and ent.dead:
495 self.inodes.del_entry(ent)
498 def open(self, inode, flags):
499 if inode in self.inodes:
500 p = self.inodes[inode]
502 raise llfuse.FUSEError(errno.ENOENT)
504 if isinstance(p, Directory):
505 raise llfuse.FUSEError(errno.EISDIR)
507 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
508 raise llfuse.FUSEError(errno.EPERM)
510 fh = next(self._filehandles_counter)
511 self._filehandles[fh] = FileHandle(fh, p)
514 _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
519 def read(self, fh, off, size):
520 _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
521 self.read_ops_counter.add(1)
523 if fh in self._filehandles:
524 handle = self._filehandles[fh]
526 raise llfuse.FUSEError(errno.EBADF)
528 self.inodes.touch(handle.obj)
530 r = handle.obj.readfrom(off, size, self.num_retries)
532 self.read_counter.add(len(r))
536 def write(self, fh, off, buf):
537 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
538 self.write_ops_counter.add(1)
540 if fh in self._filehandles:
541 handle = self._filehandles[fh]
543 raise llfuse.FUSEError(errno.EBADF)
545 if not handle.obj.writable():
546 raise llfuse.FUSEError(errno.EPERM)
548 self.inodes.touch(handle.obj)
550 w = handle.obj.writeto(off, buf, self.num_retries)
552 self.write_counter.add(w)
556 def release(self, fh):
557 if fh in self._filehandles:
559 self._filehandles[fh].flush()
563 self._filehandles[fh].release()
564 del self._filehandles[fh]
565 self.inodes.inode_cache.cap_cache()
567 def releasedir(self, fh):
571 def opendir(self, inode):
572 _logger.debug("arv-mount opendir: inode %i", inode)
574 if inode in self.inodes:
575 p = self.inodes[inode]
577 raise llfuse.FUSEError(errno.ENOENT)
579 if not isinstance(p, Directory):
580 raise llfuse.FUSEError(errno.ENOTDIR)
582 fh = next(self._filehandles_counter)
583 if p.parent_inode in self.inodes:
584 parent = self.inodes[p.parent_inode]
586 raise llfuse.FUSEError(errno.EIO)
591 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
595 def readdir(self, fh, off):
596 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
598 if fh in self._filehandles:
599 handle = self._filehandles[fh]
601 raise llfuse.FUSEError(errno.EBADF)
604 while e < len(handle.entries):
605 if handle.entries[e][1].inode in self.inodes:
606 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
611 st = llfuse.StatvfsData()
612 st.f_bsize = 128 * 1024
625 def _check_writable(self, inode_parent):
626 if not self.enable_write:
627 raise llfuse.FUSEError(errno.EROFS)
629 if inode_parent in self.inodes:
630 p = self.inodes[inode_parent]
632 raise llfuse.FUSEError(errno.ENOENT)
634 if not isinstance(p, Directory):
635 raise llfuse.FUSEError(errno.ENOTDIR)
638 raise llfuse.FUSEError(errno.EPERM)
643 def create(self, inode_parent, name, mode, flags, ctx):
644 _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
646 p = self._check_writable(inode_parent)
649 # The file entry should have been implicitly created by callback.
651 fh = next(self._filehandles_counter)
652 self._filehandles[fh] = FileHandle(fh, f)
656 return (fh, self.getattr(f.inode))
659 def mkdir(self, inode_parent, name, mode, ctx):
660 _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
662 p = self._check_writable(inode_parent)
665 # The dir entry should have been implicitly created by callback.
669 return self.getattr(d.inode)
672 def unlink(self, inode_parent, name):
673 _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
674 p = self._check_writable(inode_parent)
678 def rmdir(self, inode_parent, name):
679 _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
680 p = self._check_writable(inode_parent)
684 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
685 _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
686 src = self._check_writable(inode_parent_old)
687 dest = self._check_writable(inode_parent_new)
688 dest.rename(name_old, name_new, src)
692 if fh in self._filehandles:
693 self._filehandles[fh].flush()
695 def fsync(self, fh, datasync):
698 def fsyncdir(self, fh, datasync):