1 """FUSE driver for Arvados Keep
5 There is one `Operations` object per mount point. It is the entry point for all
6 read and write requests from the llfuse module.
8 The operations object owns an `Inodes` object. The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
12 The `Inodes` object owns an `InodeCache` object. The inode cache records the
13 memory footprint of file system objects and when they are last used. When the
14 cache limit is exceeded, the least recently used objects are cleared.
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
18 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
21 Directory objects inherit from `fusedir.Directory`. The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`. These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
28 The general FUSE operation flow is as follows:
30 - The request handler is called with either an inode or file handle that is the
31 subject of the operation.
33 - Look up the inode using the Inodes table or the file handle in the
34 filehandles table to get the file system object.
36 - For methods that alter files or directories, check that the operation is
37 valid and permitted using _check_writable().
39 - Call the relevant method on the file system object.
43 The FUSE driver supports the Arvados event bus. When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
70 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
71 from fusefile import StringFile, FuseArvadosFile
73 _logger = logging.getLogger('arvados.arvados_fuse')
75 # Uncomment this to enable llfuse debug logging.
76 # log_handler = logging.StreamHandler()
77 # llogger = logging.getLogger('llfuse')
78 # llogger.addHandler(log_handler)
79 # llogger.setLevel(logging.DEBUG)
82 """Connects a numeric file handle to a File or Directory object that has
83 been opened by the client."""
85 def __init__(self, fh, obj):
94 return self.obj.flush()
97 class FileHandle(Handle):
98 """Connects a numeric file handle to a File object that has
99 been opened by the client."""
103 class DirectoryHandle(Handle):
104 """Connects a numeric file handle to a Directory object that has
105 been opened by the client."""
107 def __init__(self, fh, dirobj, entries):
108 super(DirectoryHandle, self).__init__(fh, dirobj)
109 self.entries = entries
112 class InodeCache(object):
113 """Records the memory footprint of objects and when they are last used.
115 When the cache limit is exceeded, the least recently used objects are
116 cleared. Clearing the object means discarding its contents to release
117 memory. The next time the object is accessed, it must be re-fetched from
118 the server. Note that the inode cache limit is a soft limit; the cache
119 limit may be exceeded if necessary to load very large objects, it may also
120 be exceeded if open file handles prevent objects from being cleared.
124 def __init__(self, cap, min_entries=4):
125 self._entries = collections.OrderedDict()
127 self._counter = itertools.count(0)
130 self.min_entries = min_entries
135 def _remove(self, obj, clear):
136 if clear and not obj.clear():
137 _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
139 self._total -= obj.cache_size
140 del self._entries[obj.cache_priority]
142 del self._by_uuid[obj.cache_uuid]
143 obj.cache_uuid = None
145 _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
149 if self._total > self.cap:
150 for key in list(self._entries.keys()):
151 if self._total < self.cap or len(self._entries) < self.min_entries:
153 self._remove(self._entries[key], True)
155 def manage(self, obj):
157 obj.cache_priority = next(self._counter)
158 obj.cache_size = obj.objsize()
159 self._entries[obj.cache_priority] = obj
160 obj.cache_uuid = obj.uuid()
162 self._by_uuid[obj.cache_uuid] = obj
163 self._total += obj.objsize()
164 _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
167 obj.cache_priority = None
169 def touch(self, obj):
171 if obj.cache_priority in self._entries:
172 self._remove(obj, False)
175 def unmanage(self, obj):
176 if obj.persisted() and obj.cache_priority in self._entries:
177 self._remove(obj, True)
179 def find(self, uuid):
180 return self._by_uuid.get(uuid)
182 class Inodes(object):
183 """Manage the set of inodes. This is the mapping from a numeric id
184 to a concrete File or Directory object"""
186 def __init__(self, inode_cache, encoding="utf-8"):
188 self._counter = itertools.count(llfuse.ROOT_INODE)
189 self.inode_cache = inode_cache
190 self.encoding = encoding
192 def __getitem__(self, item):
193 return self._entries[item]
195 def __setitem__(self, key, item):
196 self._entries[key] = item
199 return self._entries.iterkeys()
202 return self._entries.items()
204 def __contains__(self, k):
205 return k in self._entries
207 def touch(self, entry):
208 entry._atime = time.time()
209 self.inode_cache.touch(entry)
211 def add_entry(self, entry):
212 entry.inode = next(self._counter)
213 if entry.inode == llfuse.ROOT_INODE:
215 self._entries[entry.inode] = entry
216 self.inode_cache.manage(entry)
219 def del_entry(self, entry):
220 if entry.ref_count == 0:
221 _logger.debug("Deleting inode %i", entry.inode)
222 self.inode_cache.unmanage(entry)
223 llfuse.invalidate_inode(entry.inode)
225 del self._entries[entry.inode]
229 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
232 def catch_exceptions(orig_func):
233 """Catch uncaught exceptions and log them consistently."""
235 @functools.wraps(orig_func)
236 def catch_exceptions_wrapper(self, *args, **kwargs):
238 return orig_func(self, *args, **kwargs)
239 except llfuse.FUSEError:
241 except EnvironmentError as e:
242 raise llfuse.FUSEError(e.errno)
244 _logger.exception("Unhandled exception during FUSE operation")
245 raise llfuse.FUSEError(errno.EIO)
247 return catch_exceptions_wrapper
250 class Operations(llfuse.Operations):
251 """This is the main interface with llfuse.
253 The methods on this object are called by llfuse threads to service FUSE
254 events to query and read from the file system.
256 llfuse has its own global lock which is acquired before calling a request handler,
257 so request handlers do not run concurrently unless the lock is explicitly released
258 using 'with llfuse.lock_released:'
262 def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
263 super(Operations, self).__init__()
266 inode_cache = InodeCache(cap=256*1024*1024)
267 self.inodes = Inodes(inode_cache, encoding=encoding)
270 self.enable_write = enable_write
272 # dict of inode to filehandle
273 self._filehandles = {}
274 self._filehandles_counter = itertools.count(0)
276 # Other threads that need to wait until the fuse driver
277 # is fully initialized should wait() on this event object.
278 self.initlock = threading.Event()
280 self.num_retries = num_retries
285 # Allow threads that are waiting for the driver to be finished
286 # initializing to continue
295 for k,v in self.inodes.items():
299 def access(self, inode, mode, ctx):
302 def listen_for_events(self, api_client):
303 self.events = arvados.events.subscribe(api_client,
304 [["event_type", "in", ["create", "update", "delete"]]],
308 def on_event(self, ev):
309 if 'event_type' in ev:
311 item = self.inodes.inode_cache.find(ev["object_uuid"])
314 if ev["object_kind"] == "arvados#collection":
315 new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
316 record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
317 item.update(to_record_version=record_version)
321 oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid")
322 olditemparent = self.inodes.inode_cache.find(oldowner)
323 if olditemparent is not None:
324 olditemparent.invalidate()
325 olditemparent.update()
327 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
328 if itemparent is not None:
329 itemparent.invalidate()
333 def getattr(self, inode):
334 if inode not in self.inodes:
335 raise llfuse.FUSEError(errno.ENOENT)
337 e = self.inodes[inode]
339 entry = llfuse.EntryAttributes()
342 entry.entry_timeout = 60
343 entry.attr_timeout = 60
345 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
346 if isinstance(e, Directory):
347 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
349 entry.st_mode |= stat.S_IFREG
350 if isinstance(e, FuseArvadosFile):
351 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
353 if self.enable_write and e.writable():
354 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
357 entry.st_uid = self.uid
358 entry.st_gid = self.gid
361 entry.st_size = e.size()
363 entry.st_blksize = 512
364 entry.st_blocks = (entry.st_size/512)+1
365 entry.st_atime = int(e.atime())
366 entry.st_mtime = int(e.mtime())
367 entry.st_ctime = int(e.mtime())
372 def setattr(self, inode, attr):
373 entry = self.getattr(inode)
375 e = self.inodes[inode]
377 if attr.st_size is not None and isinstance(e, FuseArvadosFile):
378 with llfuse.lock_released:
379 e.arvfile.truncate(attr.st_size)
380 entry.st_size = e.arvfile.size()
385 def lookup(self, parent_inode, name):
386 name = unicode(name, self.inodes.encoding)
392 if parent_inode in self.inodes:
393 p = self.inodes[parent_inode]
395 inode = p.parent_inode
396 elif isinstance(p, Directory) and name in p:
397 inode = p[name].inode
400 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
401 parent_inode, name, inode)
402 self.inodes[inode].inc_ref()
403 return self.getattr(inode)
405 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
407 raise llfuse.FUSEError(errno.ENOENT)
410 def forget(self, inodes):
411 for inode, nlookup in inodes:
412 ent = self.inodes[inode]
413 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
414 if ent.dec_ref(nlookup) == 0 and ent.dead:
415 self.inodes.del_entry(ent)
418 def open(self, inode, flags):
419 if inode in self.inodes:
420 p = self.inodes[inode]
422 raise llfuse.FUSEError(errno.ENOENT)
424 if isinstance(p, Directory):
425 raise llfuse.FUSEError(errno.EISDIR)
427 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
428 raise llfuse.FUSEError(errno.EPERM)
430 fh = next(self._filehandles_counter)
431 self._filehandles[fh] = FileHandle(fh, p)
436 def read(self, fh, off, size):
437 _logger.debug("arv-mount read %i %i %i", fh, off, size)
438 if fh in self._filehandles:
439 handle = self._filehandles[fh]
441 raise llfuse.FUSEError(errno.EBADF)
443 self.inodes.touch(handle.obj)
446 return handle.obj.readfrom(off, size, self.num_retries)
447 except arvados.errors.NotFoundError as e:
448 _logger.error("Block not found: " + str(e))
449 raise llfuse.FUSEError(errno.EIO)
452 def write(self, fh, off, buf):
453 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
454 if fh in self._filehandles:
455 handle = self._filehandles[fh]
457 raise llfuse.FUSEError(errno.EBADF)
459 if not handle.obj.writable():
460 raise llfuse.FUSEError(errno.EPERM)
462 self.inodes.touch(handle.obj)
464 return handle.obj.writeto(off, buf, self.num_retries)
467 def release(self, fh):
468 if fh in self._filehandles:
470 self._filehandles[fh].flush()
471 except EnvironmentError as e:
472 raise llfuse.FUSEError(e.errno)
474 _logger.exception("Flush error")
475 self._filehandles[fh].release()
476 del self._filehandles[fh]
477 self.inodes.inode_cache.cap_cache()
479 def releasedir(self, fh):
483 def opendir(self, inode):
484 _logger.debug("arv-mount opendir: inode %i", inode)
486 if inode in self.inodes:
487 p = self.inodes[inode]
489 raise llfuse.FUSEError(errno.ENOENT)
491 if not isinstance(p, Directory):
492 raise llfuse.FUSEError(errno.ENOTDIR)
494 fh = next(self._filehandles_counter)
495 if p.parent_inode in self.inodes:
496 parent = self.inodes[p.parent_inode]
498 raise llfuse.FUSEError(errno.EIO)
503 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
507 def readdir(self, fh, off):
508 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
510 if fh in self._filehandles:
511 handle = self._filehandles[fh]
513 raise llfuse.FUSEError(errno.EBADF)
515 _logger.debug("arv-mount handle.dirobj %s", handle.obj)
518 while e < len(handle.entries):
519 if handle.entries[e][1].inode in self.inodes:
520 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
525 st = llfuse.StatvfsData()
526 st.f_bsize = 128 * 1024
539 def _check_writable(self, inode_parent):
540 if not self.enable_write:
541 raise llfuse.FUSEError(errno.EROFS)
543 if inode_parent in self.inodes:
544 p = self.inodes[inode_parent]
546 raise llfuse.FUSEError(errno.ENOENT)
548 if not isinstance(p, Directory):
549 raise llfuse.FUSEError(errno.ENOTDIR)
552 raise llfuse.FUSEError(errno.EPERM)
557 def create(self, inode_parent, name, mode, flags, ctx):
558 p = self._check_writable(inode_parent)
561 # The file entry should have been implicitly created by callback.
563 fh = next(self._filehandles_counter)
564 self._filehandles[fh] = FileHandle(fh, f)
568 return (fh, self.getattr(f.inode))
571 def mkdir(self, inode_parent, name, mode, ctx):
572 _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
574 p = self._check_writable(inode_parent)
577 # The dir entry should have been implicitly created by callback.
581 return self.getattr(d.inode)
584 def unlink(self, inode_parent, name):
585 _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
586 p = self._check_writable(inode_parent)
590 def rmdir(self, inode_parent, name):
591 _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
592 p = self._check_writable(inode_parent)
596 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
597 _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
598 src = self._check_writable(inode_parent_old)
599 dest = self._check_writable(inode_parent_new)
600 dest.rename(name_old, name_new, src)
604 if fh in self._filehandles:
605 self._filehandles[fh].flush()
607 def fsync(self, fh, datasync):
610 def fsyncdir(self, fh, datasync):