1 """FUSE driver for Arvados Keep
5 There is one `Operations` object per mount point. It is the entry point for all
6 read and write requests from the llfuse module.
8 The operations object owns an `Inodes` object. The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
12 The `Inodes` object owns an `InodeCache` object. The inode cache records the
13 memory footprint of file system objects and when they are last used. When the
14 cache limit is exceeded, the least recently used objects are cleared.
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
18 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
21 Directory objects inherit from `fusedir.Directory`. The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`. These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
28 The general FUSE operation flow is as follows:
30 - The request handler is called with either an inode or file handle that is the
31 subject of the operation.
33 - Look up the inode using the Inodes table or the file handle in the
34 filehandles table to get the file system object.
36 - For methods that alter files or directories, check that the operation is
37 valid and permitted using _check_writable().
39 - Call the relevant method on the file system object.
43 The FUSE driver supports the Arvados event bus. When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
73 # Default _notify_queue has a limit of 1000 items, but it really needs to be
74 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
77 llfuse.capi._notify_queue = Queue.Queue()
79 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
80 from fusefile import StringFile, FuseArvadosFile
82 _logger = logging.getLogger('arvados.arvados_fuse')
84 # Uncomment this to enable llfuse debug logging.
85 # log_handler = logging.StreamHandler()
86 # llogger = logging.getLogger('llfuse')
87 # llogger.addHandler(log_handler)
88 # llogger.setLevel(logging.DEBUG)
91 """Connects a numeric file handle to a File or Directory object that has
92 been opened by the client."""
94 def __init__(self, fh, obj):
103 if self.obj.writable():
104 return self.obj.flush()
107 class FileHandle(Handle):
108 """Connects a numeric file handle to a File object that has
109 been opened by the client."""
113 class DirectoryHandle(Handle):
114 """Connects a numeric file handle to a Directory object that has
115 been opened by the client."""
117 def __init__(self, fh, dirobj, entries):
118 super(DirectoryHandle, self).__init__(fh, dirobj)
119 self.entries = entries
122 class InodeCache(object):
123 """Records the memory footprint of objects and when they are last used.
125 When the cache limit is exceeded, the least recently used objects are
126 cleared. Clearing the object means discarding its contents to release
127 memory. The next time the object is accessed, it must be re-fetched from
128 the server. Note that the inode cache limit is a soft limit; the cache
129 limit may be exceeded if necessary to load very large objects, it may also
130 be exceeded if open file handles prevent objects from being cleared.
134 def __init__(self, cap, min_entries=4):
135 self._entries = collections.OrderedDict()
139 self.min_entries = min_entries
144 def _remove(self, obj, clear):
147 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
149 if obj.has_ref(True):
150 obj.kernel_invalidate()
151 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
155 # The llfuse lock is released in del_entry(), which is called by
156 # Directory.clear(). While the llfuse lock is released, it can happen
157 # that a reentrant call removes this entry before this call gets to it.
158 # Ensure that the entry is still valid before trying to remove it.
159 if obj.inode not in self._entries:
162 self._total -= obj.cache_size
163 del self._entries[obj.inode]
165 self._by_uuid[obj.cache_uuid].remove(obj)
166 if not self._by_uuid[obj.cache_uuid]:
167 del self._by_uuid[obj.cache_uuid]
168 obj.cache_uuid = None
170 _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
173 if self._total > self.cap:
174 for ent in self._entries.values():
175 if self._total < self.cap or len(self._entries) < self.min_entries:
177 self._remove(ent, True)
179 def manage(self, obj):
181 obj.cache_size = obj.objsize()
182 self._entries[obj.inode] = obj
183 obj.cache_uuid = obj.uuid()
185 if obj.cache_uuid not in self._by_uuid:
186 self._by_uuid[obj.cache_uuid] = [obj]
188 if obj not in self._by_uuid[obj.cache_uuid]:
189 self._by_uuid[obj.cache_uuid].append(obj)
190 self._total += obj.objsize()
191 _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
194 def touch(self, obj):
196 if obj.inode in self._entries:
197 self._remove(obj, False)
200 def unmanage(self, obj):
201 if obj.persisted() and obj.inode in self._entries:
202 self._remove(obj, True)
204 def find_by_uuid(self, uuid):
205 return self._by_uuid.get(uuid, [])
208 self._entries.clear()
209 self._by_uuid.clear()
212 class Inodes(object):
213 """Manage the set of inodes. This is the mapping from a numeric id
214 to a concrete File or Directory object"""
216 def __init__(self, inode_cache, encoding="utf-8"):
218 self._counter = itertools.count(llfuse.ROOT_INODE)
219 self.inode_cache = inode_cache
220 self.encoding = encoding
221 self.deferred_invalidations = []
223 def __getitem__(self, item):
224 return self._entries[item]
226 def __setitem__(self, key, item):
227 self._entries[key] = item
230 return self._entries.iterkeys()
233 return self._entries.items()
235 def __contains__(self, k):
236 return k in self._entries
238 def touch(self, entry):
239 entry._atime = time.time()
240 self.inode_cache.touch(entry)
242 def add_entry(self, entry):
243 entry.inode = next(self._counter)
244 if entry.inode == llfuse.ROOT_INODE:
246 self._entries[entry.inode] = entry
247 self.inode_cache.manage(entry)
250 def del_entry(self, entry):
251 if entry.ref_count == 0:
252 self.inode_cache.unmanage(entry)
253 del self._entries[entry.inode]
254 with llfuse.lock_released:
256 self.invalidate_inode(entry.inode)
260 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
262 def invalidate_inode(self, inode):
263 llfuse.invalidate_inode(inode)
265 def invalidate_entry(self, inode, name):
266 llfuse.invalidate_entry(inode, name.encode(self.encoding))
269 self.inode_cache.clear()
271 for k,v in self._entries.items():
274 except Exception as e:
275 _logger.exception("Error during finalize of inode %i", k)
277 self._entries.clear()
280 def catch_exceptions(orig_func):
281 """Catch uncaught exceptions and log them consistently."""
283 @functools.wraps(orig_func)
284 def catch_exceptions_wrapper(self, *args, **kwargs):
286 return orig_func(self, *args, **kwargs)
287 except llfuse.FUSEError:
289 except EnvironmentError as e:
290 raise llfuse.FUSEError(e.errno)
291 except arvados.errors.KeepWriteError as e:
292 _logger.error("Keep write error: " + str(e))
293 raise llfuse.FUSEError(errno.EIO)
294 except arvados.errors.NotFoundError as e:
295 _logger.error("Block not found error: " + str(e))
296 raise llfuse.FUSEError(errno.EIO)
298 _logger.exception("Unhandled exception during FUSE operation")
299 raise llfuse.FUSEError(errno.EIO)
301 return catch_exceptions_wrapper
304 class Operations(llfuse.Operations):
305 """This is the main interface with llfuse.
307 The methods on this object are called by llfuse threads to service FUSE
308 events to query and read from the file system.
310 llfuse has its own global lock which is acquired before calling a request handler,
311 so request handlers do not run concurrently unless the lock is explicitly released
312 using 'with llfuse.lock_released:'
316 def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
317 super(Operations, self).__init__()
319 self._api_client = api_client
322 inode_cache = InodeCache(cap=256*1024*1024)
323 self.inodes = Inodes(inode_cache, encoding=encoding)
326 self.enable_write = enable_write
328 # dict of inode to filehandle
329 self._filehandles = {}
330 self._filehandles_counter = itertools.count(0)
332 # Other threads that need to wait until the fuse driver
333 # is fully initialized should wait() on this event object.
334 self.initlock = threading.Event()
336 # If we get overlapping shutdown events (e.g., fusermount -u
337 # -z and operations.destroy()) llfuse calls forget() on inodes
338 # that have already been deleted. To avoid this, we make
339 # forget() a no-op if called after destroy().
340 self._shutdown_started = threading.Event()
342 self.num_retries = num_retries
344 self.read_counter = arvados.keep.Counter()
345 self.write_counter = arvados.keep.Counter()
346 self.read_ops_counter = arvados.keep.Counter()
347 self.write_ops_counter = arvados.keep.Counter()
352 # Allow threads that are waiting for the driver to be finished
353 # initializing to continue
359 self._shutdown_started.set()
366 def access(self, inode, mode, ctx):
369 def listen_for_events(self):
370 self.events = arvados.events.subscribe(
372 [["event_type", "in", ["create", "update", "delete"]]],
376 def on_event(self, ev):
377 if 'event_type' not in ev:
380 for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
382 if ev["object_kind"] == "arvados#collection":
383 new_attr = (ev.get("properties") and
384 ev["properties"].get("new_attributes") and
385 ev["properties"]["new_attributes"])
387 # new_attributes.modified_at currently lacks
388 # subsecond precision (see #6347) so use event_at
389 # which should always be the same.
391 (ev["event_at"], new_attr["portable_data_hash"])
392 if new_attr else None)
394 item.update(to_record_version=record_version)
399 ev.get("properties") and
400 ev["properties"].get("old_attributes") and
401 ev["properties"]["old_attributes"].get("owner_uuid"))
402 newowner = ev["object_owner_uuid"]
404 self.inodes.inode_cache.find_by_uuid(oldowner) +
405 self.inodes.inode_cache.find_by_uuid(newowner)):
411 def getattr(self, inode):
412 if inode not in self.inodes:
413 raise llfuse.FUSEError(errno.ENOENT)
415 e = self.inodes[inode]
417 entry = llfuse.EntryAttributes()
420 entry.entry_timeout = 60 if e.allow_dirent_cache else 0
421 entry.attr_timeout = 60 if e.allow_attr_cache else 0
423 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
424 if isinstance(e, Directory):
425 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
427 entry.st_mode |= stat.S_IFREG
428 if isinstance(e, FuseArvadosFile):
429 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
431 if self.enable_write and e.writable():
432 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
435 entry.st_uid = self.uid
436 entry.st_gid = self.gid
439 entry.st_size = e.size()
441 entry.st_blksize = 512
442 entry.st_blocks = (entry.st_size/512)+1
443 entry.st_atime = int(e.atime())
444 entry.st_mtime = int(e.mtime())
445 entry.st_ctime = int(e.mtime())
450 def setattr(self, inode, attr):
451 entry = self.getattr(inode)
453 e = self.inodes[inode]
455 if attr.st_size is not None and isinstance(e, FuseArvadosFile):
456 with llfuse.lock_released:
457 e.arvfile.truncate(attr.st_size)
458 entry.st_size = e.arvfile.size()
463 def lookup(self, parent_inode, name):
464 name = unicode(name, self.inodes.encoding)
470 if parent_inode in self.inodes:
471 p = self.inodes[parent_inode]
474 inode = p.parent_inode
475 elif isinstance(p, Directory) and name in p:
476 inode = p[name].inode
479 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
480 parent_inode, name, inode)
481 self.inodes[inode].inc_ref()
482 return self.getattr(inode)
484 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
486 raise llfuse.FUSEError(errno.ENOENT)
489 def forget(self, inodes):
490 if self._shutdown_started.is_set():
492 for inode, nlookup in inodes:
493 ent = self.inodes[inode]
494 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
495 if ent.dec_ref(nlookup) == 0 and ent.dead:
496 self.inodes.del_entry(ent)
499 def open(self, inode, flags):
500 if inode in self.inodes:
501 p = self.inodes[inode]
503 raise llfuse.FUSEError(errno.ENOENT)
505 if isinstance(p, Directory):
506 raise llfuse.FUSEError(errno.EISDIR)
508 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
509 raise llfuse.FUSEError(errno.EPERM)
511 fh = next(self._filehandles_counter)
512 self._filehandles[fh] = FileHandle(fh, p)
515 # Normally, we will have received an "update" event if the
516 # parent collection is stale here. However, even if the parent
517 # collection hasn't changed, the manifest might have been
518 # fetched so long ago that the signatures on the data block
519 # locators have expired. Calling checkupdate() on all
520 # ancestors ensures the signatures will be refreshed if
522 while p.parent_inode in self.inodes:
523 if p == self.inodes[p.parent_inode]:
525 p = self.inodes[p.parent_inode]
529 _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
534 def read(self, fh, off, size):
535 _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
536 self.read_ops_counter.add(1)
538 if fh in self._filehandles:
539 handle = self._filehandles[fh]
541 raise llfuse.FUSEError(errno.EBADF)
543 self.inodes.touch(handle.obj)
545 r = handle.obj.readfrom(off, size, self.num_retries)
547 self.read_counter.add(len(r))
551 def write(self, fh, off, buf):
552 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
553 self.write_ops_counter.add(1)
555 if fh in self._filehandles:
556 handle = self._filehandles[fh]
558 raise llfuse.FUSEError(errno.EBADF)
560 if not handle.obj.writable():
561 raise llfuse.FUSEError(errno.EPERM)
563 self.inodes.touch(handle.obj)
565 w = handle.obj.writeto(off, buf, self.num_retries)
567 self.write_counter.add(w)
571 def release(self, fh):
572 if fh in self._filehandles:
574 self._filehandles[fh].flush()
578 self._filehandles[fh].release()
579 del self._filehandles[fh]
580 self.inodes.inode_cache.cap_cache()
582 def releasedir(self, fh):
586 def opendir(self, inode):
587 _logger.debug("arv-mount opendir: inode %i", inode)
589 if inode in self.inodes:
590 p = self.inodes[inode]
592 raise llfuse.FUSEError(errno.ENOENT)
594 if not isinstance(p, Directory):
595 raise llfuse.FUSEError(errno.ENOTDIR)
597 fh = next(self._filehandles_counter)
598 if p.parent_inode in self.inodes:
599 parent = self.inodes[p.parent_inode]
601 raise llfuse.FUSEError(errno.EIO)
606 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
610 def readdir(self, fh, off):
611 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
613 if fh in self._filehandles:
614 handle = self._filehandles[fh]
616 raise llfuse.FUSEError(errno.EBADF)
619 while e < len(handle.entries):
620 if handle.entries[e][1].inode in self.inodes:
621 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
626 st = llfuse.StatvfsData()
627 st.f_bsize = 128 * 1024
640 def _check_writable(self, inode_parent):
641 if not self.enable_write:
642 raise llfuse.FUSEError(errno.EROFS)
644 if inode_parent in self.inodes:
645 p = self.inodes[inode_parent]
647 raise llfuse.FUSEError(errno.ENOENT)
649 if not isinstance(p, Directory):
650 raise llfuse.FUSEError(errno.ENOTDIR)
653 raise llfuse.FUSEError(errno.EPERM)
658 def create(self, inode_parent, name, mode, flags, ctx):
659 _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
661 p = self._check_writable(inode_parent)
664 # The file entry should have been implicitly created by callback.
666 fh = next(self._filehandles_counter)
667 self._filehandles[fh] = FileHandle(fh, f)
671 return (fh, self.getattr(f.inode))
674 def mkdir(self, inode_parent, name, mode, ctx):
675 _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
677 p = self._check_writable(inode_parent)
680 # The dir entry should have been implicitly created by callback.
684 return self.getattr(d.inode)
687 def unlink(self, inode_parent, name):
688 _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
689 p = self._check_writable(inode_parent)
693 def rmdir(self, inode_parent, name):
694 _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
695 p = self._check_writable(inode_parent)
699 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
700 _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
701 src = self._check_writable(inode_parent_old)
702 dest = self._check_writable(inode_parent_new)
703 dest.rename(name_old, name_new, src)
707 if fh in self._filehandles:
708 self._filehandles[fh].flush()
710 def fsync(self, fh, datasync):
713 def fsyncdir(self, fh, datasync):