1 """FUSE driver for Arvados Keep
5 There is one `Operations` object per mount point. It is the entry point for all
6 read and write requests from the llfuse module.
8 The operations object owns an `Inodes` object. The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
12 The `Inodes` object owns an `InodeCache` object. The inode cache records the
13 memory footprint of file system objects and when they are last used. When the
14 cache limit is exceeded, the least recently used objects are cleared.
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
18 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
21 Directory objects inherit from `fusedir.Directory`. The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`. These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
28 The general FUSE operation flow is as follows:
30 - The request handler is called with either an inode or file handle that is the
31 subject of the operation.
33 - Look up the inode using the Inodes table or the file handle in the
34 filehandles table to get the file system object.
36 - For methods that alter files or directories, check that the operation is
37 valid and permitted using _check_writable().
39 - Call the relevant method on the file system object.
43 The FUSE driver supports the Arvados event bus. When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
73 # Default _notify_queue has a limit of 1000 items, but it really needs to be
74 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
77 llfuse.capi._notify_queue = Queue.Queue()
79 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
80 from fusefile import StringFile, FuseArvadosFile
82 _logger = logging.getLogger('arvados.arvados_fuse')
84 # Uncomment this to enable llfuse debug logging.
85 # log_handler = logging.StreamHandler()
86 # llogger = logging.getLogger('llfuse')
87 # llogger.addHandler(log_handler)
88 # llogger.setLevel(logging.DEBUG)
91 """Connects a numeric file handle to a File or Directory object that has
92 been opened by the client."""
94 def __init__(self, fh, obj):
103 if self.obj.writable():
104 return self.obj.flush()
107 class FileHandle(Handle):
108 """Connects a numeric file handle to a File object that has
109 been opened by the client."""
113 class DirectoryHandle(Handle):
114 """Connects a numeric file handle to a Directory object that has
115 been opened by the client."""
117 def __init__(self, fh, dirobj, entries):
118 super(DirectoryHandle, self).__init__(fh, dirobj)
119 self.entries = entries
122 class InodeCache(object):
123 """Records the memory footprint of objects and when they are last used.
125 When the cache limit is exceeded, the least recently used objects are
126 cleared. Clearing the object means discarding its contents to release
127 memory. The next time the object is accessed, it must be re-fetched from
128 the server. Note that the inode cache limit is a soft limit; the cache
129 limit may be exceeded if necessary to load very large objects, it may also
130 be exceeded if open file handles prevent objects from being cleared.
134 def __init__(self, cap, min_entries=4):
135 self._entries = collections.OrderedDict()
139 self.min_entries = min_entries
144 def _remove(self, obj, clear):
147 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
149 if obj.has_ref(only_children=True):
150 obj.kernel_invalidate()
151 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
154 self._total -= obj.cache_size
155 del self._entries[obj.inode]
157 self._by_uuid[obj.cache_uuid].remove(obj)
158 if not self._by_uuid[obj.cache_uuid]:
159 del self._by_uuid[obj.cache_uuid]
160 obj.cache_uuid = None
162 _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
166 if self._total > self.cap:
167 for ent in self._entries.values():
168 if self._total < self.cap or len(self._entries) < self.min_entries:
170 self._remove(ent, True)
172 def manage(self, obj):
174 obj.cache_size = obj.objsize()
175 self._entries[obj.inode] = obj
176 obj.cache_uuid = obj.uuid()
178 if obj.cache_uuid not in self._by_uuid:
179 self._by_uuid[obj.cache_uuid] = [obj]
181 if obj not in self._by_uuid[obj.cache_uuid]:
182 self._by_uuid[obj.cache_uuid].append(obj)
183 self._total += obj.objsize()
184 _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
187 def touch(self, obj):
189 if obj.inode in self._entries:
190 self._remove(obj, False)
193 def unmanage(self, obj):
194 if obj.persisted() and obj.inode in self._entries:
195 self._remove(obj, True)
197 def find_by_uuid(self, uuid):
198 return self._by_uuid.get(uuid, [])
201 self._entries.clear()
202 self._by_uuid.clear()
205 class Inodes(object):
206 """Manage the set of inodes. This is the mapping from a numeric id
207 to a concrete File or Directory object"""
209 def __init__(self, inode_cache, encoding="utf-8"):
211 self._counter = itertools.count(llfuse.ROOT_INODE)
212 self.inode_cache = inode_cache
213 self.encoding = encoding
214 self.deferred_invalidations = []
216 def __getitem__(self, item):
217 return self._entries[item]
219 def __setitem__(self, key, item):
220 self._entries[key] = item
223 return self._entries.iterkeys()
226 return self._entries.items()
228 def __contains__(self, k):
229 return k in self._entries
231 def touch(self, entry):
232 entry._atime = time.time()
233 self.inode_cache.touch(entry)
235 def add_entry(self, entry):
236 entry.inode = next(self._counter)
237 if entry.inode == llfuse.ROOT_INODE:
239 self._entries[entry.inode] = entry
240 self.inode_cache.manage(entry)
243 def del_entry(self, entry):
244 if entry.ref_count == 0:
245 self.inode_cache.unmanage(entry)
246 del self._entries[entry.inode]
247 with llfuse.lock_released:
249 self.invalidate_inode(entry.inode)
253 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
255 def invalidate_inode(self, inode):
256 llfuse.invalidate_inode(inode)
258 def invalidate_entry(self, inode, name):
259 llfuse.invalidate_entry(inode, name.encode(self.encoding))
262 self.inode_cache.clear()
264 for k,v in self._entries.items():
267 except Exception as e:
268 _logger.exception("Error during finalize of inode %i", k)
270 self._entries.clear()
273 def catch_exceptions(orig_func):
274 """Catch uncaught exceptions and log them consistently."""
276 @functools.wraps(orig_func)
277 def catch_exceptions_wrapper(self, *args, **kwargs):
279 return orig_func(self, *args, **kwargs)
280 except llfuse.FUSEError:
282 except EnvironmentError as e:
283 raise llfuse.FUSEError(e.errno)
284 except arvados.errors.KeepWriteError as e:
285 _logger.error("Keep write error: " + str(e))
286 raise llfuse.FUSEError(errno.EIO)
287 except arvados.errors.NotFoundError as e:
288 _logger.error("Block not found error: " + str(e))
289 raise llfuse.FUSEError(errno.EIO)
291 _logger.exception("Unhandled exception during FUSE operation")
292 raise llfuse.FUSEError(errno.EIO)
294 return catch_exceptions_wrapper
297 class Operations(llfuse.Operations):
298 """This is the main interface with llfuse.
300 The methods on this object are called by llfuse threads to service FUSE
301 events to query and read from the file system.
303 llfuse has its own global lock which is acquired before calling a request handler,
304 so request handlers do not run concurrently unless the lock is explicitly released
305 using 'with llfuse.lock_released:'
309 def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
310 super(Operations, self).__init__()
312 self._api_client = api_client
315 inode_cache = InodeCache(cap=256*1024*1024)
316 self.inodes = Inodes(inode_cache, encoding=encoding)
319 self.enable_write = enable_write
321 # dict of inode to filehandle
322 self._filehandles = {}
323 self._filehandles_counter = itertools.count(0)
325 # Other threads that need to wait until the fuse driver
326 # is fully initialized should wait() on this event object.
327 self.initlock = threading.Event()
329 # If we get overlapping shutdown events (e.g., fusermount -u
330 # -z and operations.destroy()) llfuse calls forget() on inodes
331 # that have already been deleted. To avoid this, we make
332 # forget() a no-op if called after destroy().
333 self._shutdown_started = threading.Event()
335 self.num_retries = num_retries
337 self.read_counter = arvados.keep.Counter()
338 self.write_counter = arvados.keep.Counter()
339 self.read_ops_counter = arvados.keep.Counter()
340 self.write_ops_counter = arvados.keep.Counter()
345 # Allow threads that are waiting for the driver to be finished
346 # initializing to continue
352 self._shutdown_started.set()
359 def access(self, inode, mode, ctx):
362 def listen_for_events(self):
363 self.events = arvados.events.subscribe(self._api_client,
364 [["event_type", "in", ["create", "update", "delete"]]],
368 def on_event(self, ev):
369 if 'event_type' not in ev:
372 for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
374 if ev["object_kind"] == "arvados#collection":
375 new_attr = (ev.get("properties") and
376 ev["properties"].get("new_attributes") and
377 ev["properties"]["new_attributes"])
379 # new_attributes.modified_at currently lacks
380 # subsecond precision (see #6347) so use event_at
381 # which should always be the same.
383 (ev["event_at"], new_attr["portable_data_hash"])
384 if new_attr else None)
386 item.update(to_record_version=record_version)
391 ev.get("properties") and
392 ev["properties"].get("old_attributes") and
393 ev["properties"]["old_attributes"].get("owner_uuid"))
394 newowner = ev["object_owner_uuid"]
396 self.inodes.inode_cache.find_by_uuid(oldowner) +
397 self.inodes.inode_cache.find_by_uuid(newowner)):
403 def getattr(self, inode):
404 if inode not in self.inodes:
405 raise llfuse.FUSEError(errno.ENOENT)
407 e = self.inodes[inode]
409 entry = llfuse.EntryAttributes()
412 entry.entry_timeout = 60 if e.allow_dirent_cache else 0
413 entry.attr_timeout = 60 if e.allow_attr_cache else 0
415 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
416 if isinstance(e, Directory):
417 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
419 entry.st_mode |= stat.S_IFREG
420 if isinstance(e, FuseArvadosFile):
421 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
423 if self.enable_write and e.writable():
424 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
427 entry.st_uid = self.uid
428 entry.st_gid = self.gid
431 entry.st_size = e.size()
433 entry.st_blksize = 512
434 entry.st_blocks = (entry.st_size/512)+1
435 entry.st_atime = int(e.atime())
436 entry.st_mtime = int(e.mtime())
437 entry.st_ctime = int(e.mtime())
442 def setattr(self, inode, attr):
443 entry = self.getattr(inode)
445 e = self.inodes[inode]
447 if attr.st_size is not None and isinstance(e, FuseArvadosFile):
448 with llfuse.lock_released:
449 e.arvfile.truncate(attr.st_size)
450 entry.st_size = e.arvfile.size()
455 def lookup(self, parent_inode, name):
456 name = unicode(name, self.inodes.encoding)
462 if parent_inode in self.inodes:
463 p = self.inodes[parent_inode]
466 inode = p.parent_inode
467 elif isinstance(p, Directory) and name in p:
468 inode = p[name].inode
471 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
472 parent_inode, name, inode)
473 self.inodes[inode].inc_ref()
474 return self.getattr(inode)
476 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
478 raise llfuse.FUSEError(errno.ENOENT)
481 def forget(self, inodes):
482 if self._shutdown_started.is_set():
484 for inode, nlookup in inodes:
485 ent = self.inodes[inode]
486 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
487 if ent.dec_ref(nlookup) == 0 and ent.dead:
488 self.inodes.del_entry(ent)
491 def open(self, inode, flags):
492 if inode in self.inodes:
493 p = self.inodes[inode]
495 raise llfuse.FUSEError(errno.ENOENT)
497 if isinstance(p, Directory):
498 raise llfuse.FUSEError(errno.EISDIR)
500 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
501 raise llfuse.FUSEError(errno.EPERM)
503 fh = next(self._filehandles_counter)
504 self._filehandles[fh] = FileHandle(fh, p)
507 _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
512 def read(self, fh, off, size):
513 _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
514 self.read_ops_counter.add(1)
516 if fh in self._filehandles:
517 handle = self._filehandles[fh]
519 raise llfuse.FUSEError(errno.EBADF)
521 self.inodes.touch(handle.obj)
523 r = handle.obj.readfrom(off, size, self.num_retries)
525 self.read_counter.add(len(r))
529 def write(self, fh, off, buf):
530 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
531 self.write_ops_counter.add(1)
533 if fh in self._filehandles:
534 handle = self._filehandles[fh]
536 raise llfuse.FUSEError(errno.EBADF)
538 if not handle.obj.writable():
539 raise llfuse.FUSEError(errno.EPERM)
541 self.inodes.touch(handle.obj)
543 w = handle.obj.writeto(off, buf, self.num_retries)
545 self.write_counter.add(w)
549 def release(self, fh):
550 if fh in self._filehandles:
552 self._filehandles[fh].flush()
556 self._filehandles[fh].release()
557 del self._filehandles[fh]
558 self.inodes.inode_cache.cap_cache()
560 def releasedir(self, fh):
564 def opendir(self, inode):
565 _logger.debug("arv-mount opendir: inode %i", inode)
567 if inode in self.inodes:
568 p = self.inodes[inode]
570 raise llfuse.FUSEError(errno.ENOENT)
572 if not isinstance(p, Directory):
573 raise llfuse.FUSEError(errno.ENOTDIR)
575 fh = next(self._filehandles_counter)
576 if p.parent_inode in self.inodes:
577 parent = self.inodes[p.parent_inode]
579 raise llfuse.FUSEError(errno.EIO)
584 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
588 def readdir(self, fh, off):
589 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
591 if fh in self._filehandles:
592 handle = self._filehandles[fh]
594 raise llfuse.FUSEError(errno.EBADF)
597 while e < len(handle.entries):
598 if handle.entries[e][1].inode in self.inodes:
599 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
604 st = llfuse.StatvfsData()
605 st.f_bsize = 128 * 1024
618 def _check_writable(self, inode_parent):
619 if not self.enable_write:
620 raise llfuse.FUSEError(errno.EROFS)
622 if inode_parent in self.inodes:
623 p = self.inodes[inode_parent]
625 raise llfuse.FUSEError(errno.ENOENT)
627 if not isinstance(p, Directory):
628 raise llfuse.FUSEError(errno.ENOTDIR)
631 raise llfuse.FUSEError(errno.EPERM)
636 def create(self, inode_parent, name, mode, flags, ctx):
637 _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
639 p = self._check_writable(inode_parent)
642 # The file entry should have been implicitly created by callback.
644 fh = next(self._filehandles_counter)
645 self._filehandles[fh] = FileHandle(fh, f)
649 return (fh, self.getattr(f.inode))
652 def mkdir(self, inode_parent, name, mode, ctx):
653 _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
655 p = self._check_writable(inode_parent)
658 # The dir entry should have been implicitly created by callback.
662 return self.getattr(d.inode)
665 def unlink(self, inode_parent, name):
666 _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
667 p = self._check_writable(inode_parent)
671 def rmdir(self, inode_parent, name):
672 _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
673 p = self._check_writable(inode_parent)
677 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
678 _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
679 src = self._check_writable(inode_parent_old)
680 dest = self._check_writable(inode_parent_new)
681 dest.rename(name_old, name_new, src)
685 if fh in self._filehandles:
686 self._filehandles[fh].flush()
688 def fsync(self, fh, datasync):
691 def fsyncdir(self, fh, datasync):