1 """FUSE driver for Arvados Keep
5 There is one `Operations` object per mount point. It is the entry point for all
6 read and write requests from the llfuse module.
8 The operations object owns an `Inodes` object. The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
12 The `Inodes` object owns an `InodeCache` object. The inode cache records the
13 memory footprint of file system objects and when they are last used. When the
14 cache limit is exceeded, the least recently used objects are cleared.
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
18 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
21 Directory objects inherit from `fusedir.Directory`. The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`. These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
28 The general FUSE operation flow is as follows:
30 - The request handler is called with either an inode or file handle that is the
31 subject of the operation.
33 - Look up the inode using the Inodes table or the file handle in the
34 filehandles table to get the file system object.
36 - For methods that alter files or directories, check that the operation is
37 valid and permitted using _check_writable().
39 - Call the relevant method on the file system object.
43 The FUSE driver supports the Arvados event bus. When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
72 # Default _notify_queue has a limit of 1000 items, but it really needs to be
73 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
76 llfuse.capi._notify_queue = Queue.Queue()
78 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
79 from fusefile import StringFile, FuseArvadosFile
81 _logger = logging.getLogger('arvados.arvados_fuse')
83 # Uncomment this to enable llfuse debug logging.
84 # log_handler = logging.StreamHandler()
85 # llogger = logging.getLogger('llfuse')
86 # llogger.addHandler(log_handler)
87 # llogger.setLevel(logging.DEBUG)
90 """Connects a numeric file handle to a File or Directory object that has
91 been opened by the client."""
93 def __init__(self, fh, obj):
102 if self.obj.writable():
103 return self.obj.flush()
106 class FileHandle(Handle):
107 """Connects a numeric file handle to a File object that has
108 been opened by the client."""
112 class DirectoryHandle(Handle):
113 """Connects a numeric file handle to a Directory object that has
114 been opened by the client."""
116 def __init__(self, fh, dirobj, entries):
117 super(DirectoryHandle, self).__init__(fh, dirobj)
118 self.entries = entries
121 class InodeCache(object):
122 """Records the memory footprint of objects and when they are last used.
124 When the cache limit is exceeded, the least recently used objects are
125 cleared. Clearing the object means discarding its contents to release
126 memory. The next time the object is accessed, it must be re-fetched from
127 the server. Note that the inode cache limit is a soft limit; the cache
128 limit may be exceeded if necessary to load very large objects, it may also
129 be exceeded if open file handles prevent objects from being cleared.
133 def __init__(self, cap, min_entries=4):
134 self._entries = collections.OrderedDict()
136 self._counter = itertools.count(0)
139 self.min_entries = min_entries
144 def _remove(self, obj, clear):
145 if clear and not obj.clear():
146 _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
148 self._total -= obj.cache_size
149 del self._entries[obj.cache_priority]
151 self._by_uuid[obj.cache_uuid].remove(obj)
152 if not self._by_uuid[obj.cache_uuid]:
153 del self._by_uuid[obj.cache_uuid]
154 obj.cache_uuid = None
156 _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
160 if self._total > self.cap:
161 for key in list(self._entries.keys()):
162 if self._total < self.cap or len(self._entries) < self.min_entries:
164 self._remove(self._entries[key], True)
166 def manage(self, obj):
168 obj.cache_priority = next(self._counter)
169 obj.cache_size = obj.objsize()
170 self._entries[obj.cache_priority] = obj
171 obj.cache_uuid = obj.uuid()
173 if obj.cache_uuid not in self._by_uuid:
174 self._by_uuid[obj.cache_uuid] = [obj]
176 if obj not in self._by_uuid[obj.cache_uuid]:
177 self._by_uuid[obj.cache_uuid].append(obj)
178 self._total += obj.objsize()
179 _logger.debug("InodeCache touched %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
182 obj.cache_priority = None
184 def touch(self, obj):
186 if obj.cache_priority in self._entries:
187 self._remove(obj, False)
190 def unmanage(self, obj):
191 if obj.persisted() and obj.cache_priority in self._entries:
192 self._remove(obj, True)
194 def find(self, uuid):
195 return self._by_uuid.get(uuid)
198 self._entries.clear()
199 self._by_uuid.clear()
202 class Inodes(object):
203 """Manage the set of inodes. This is the mapping from a numeric id
204 to a concrete File or Directory object"""
206 def __init__(self, inode_cache, encoding="utf-8"):
208 self._counter = itertools.count(llfuse.ROOT_INODE)
209 self.inode_cache = inode_cache
210 self.encoding = encoding
211 self.deferred_invalidations = []
213 def __getitem__(self, item):
214 return self._entries[item]
216 def __setitem__(self, key, item):
217 self._entries[key] = item
220 return self._entries.iterkeys()
223 return self._entries.items()
225 def __contains__(self, k):
226 return k in self._entries
228 def touch(self, entry):
229 entry._atime = time.time()
230 self.inode_cache.touch(entry)
232 def add_entry(self, entry):
233 entry.inode = next(self._counter)
234 if entry.inode == llfuse.ROOT_INODE:
236 self._entries[entry.inode] = entry
237 self.inode_cache.manage(entry)
240 def del_entry(self, entry):
241 if entry.ref_count == 0:
242 self.inode_cache.unmanage(entry)
243 del self._entries[entry.inode]
244 with llfuse.lock_released:
246 self.invalidate_inode(entry.inode)
250 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
252 def invalidate_inode(self, inode):
253 llfuse.invalidate_inode(inode)
255 def invalidate_entry(self, inode, name):
256 llfuse.invalidate_entry(inode, name)
259 self.inode_cache.clear()
261 for k,v in self._entries.items():
264 except Exception as e:
265 _logger.exception("Error during finalize of inode %i", k)
267 self._entries.clear()
270 def catch_exceptions(orig_func):
271 """Catch uncaught exceptions and log them consistently."""
273 @functools.wraps(orig_func)
274 def catch_exceptions_wrapper(self, *args, **kwargs):
276 return orig_func(self, *args, **kwargs)
277 except llfuse.FUSEError:
279 except EnvironmentError as e:
280 raise llfuse.FUSEError(e.errno)
281 except arvados.errors.KeepWriteError as e:
282 _logger.error("Keep write error: " + str(e))
283 raise llfuse.FUSEError(errno.EIO)
284 except arvados.errors.NotFoundError as e:
285 _logger.error("Block not found error: " + str(e))
286 raise llfuse.FUSEError(errno.EIO)
288 _logger.exception("Unhandled exception during FUSE operation")
289 raise llfuse.FUSEError(errno.EIO)
291 return catch_exceptions_wrapper
294 class Operations(llfuse.Operations):
295 """This is the main interface with llfuse.
297 The methods on this object are called by llfuse threads to service FUSE
298 events to query and read from the file system.
300 llfuse has its own global lock which is acquired before calling a request handler,
301 so request handlers do not run concurrently unless the lock is explicitly released
302 using 'with llfuse.lock_released:'
306 def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
307 super(Operations, self).__init__()
310 inode_cache = InodeCache(cap=256*1024*1024)
311 self.inodes = Inodes(inode_cache, encoding=encoding)
314 self.enable_write = enable_write
316 # dict of inode to filehandle
317 self._filehandles = {}
318 self._filehandles_counter = itertools.count(0)
320 # Other threads that need to wait until the fuse driver
321 # is fully initialized should wait() on this event object.
322 self.initlock = threading.Event()
324 self.num_retries = num_retries
329 # Allow threads that are waiting for the driver to be finished
330 # initializing to continue
341 def access(self, inode, mode, ctx):
344 def listen_for_events(self, api_client):
345 self.events = arvados.events.subscribe(api_client,
346 [["event_type", "in", ["create", "update", "delete"]]],
350 def on_event(self, ev):
351 if 'event_type' in ev:
353 items = self.inodes.inode_cache.find(ev["object_uuid"])
354 if items is not None:
357 if ev["object_kind"] == "arvados#collection":
358 new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
360 # new_attributes.modified_at currently lacks subsecond precision (see #6347) so use event_at which
361 # should always be the same.
362 #record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
363 record_version = (ev["event_at"], new_attr["portable_data_hash"]) if new_attr else None
365 item.update(to_record_version=record_version)
369 oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid")
370 olditemparent = self.inodes.inode_cache.find(oldowner)
371 if olditemparent is not None:
372 olditemparent.invalidate()
373 olditemparent.update()
375 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
376 if itemparent is not None:
377 itemparent.invalidate()
382 def getattr(self, inode):
383 if inode not in self.inodes:
384 raise llfuse.FUSEError(errno.ENOENT)
386 e = self.inodes[inode]
388 entry = llfuse.EntryAttributes()
391 entry.entry_timeout = 60
392 entry.attr_timeout = 60
394 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
395 if isinstance(e, Directory):
396 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
398 entry.st_mode |= stat.S_IFREG
399 if isinstance(e, FuseArvadosFile):
400 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
402 if self.enable_write and e.writable():
403 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
406 entry.st_uid = self.uid
407 entry.st_gid = self.gid
410 entry.st_size = e.size()
412 entry.st_blksize = 512
413 entry.st_blocks = (entry.st_size/512)+1
414 entry.st_atime = int(e.atime())
415 entry.st_mtime = int(e.mtime())
416 entry.st_ctime = int(e.mtime())
421 def setattr(self, inode, attr):
422 entry = self.getattr(inode)
424 e = self.inodes[inode]
426 if attr.st_size is not None and isinstance(e, FuseArvadosFile):
427 with llfuse.lock_released:
428 e.arvfile.truncate(attr.st_size)
429 entry.st_size = e.arvfile.size()
434 def lookup(self, parent_inode, name):
435 name = unicode(name, self.inodes.encoding)
441 if parent_inode in self.inodes:
442 p = self.inodes[parent_inode]
444 inode = p.parent_inode
445 elif isinstance(p, Directory) and name in p:
446 inode = p[name].inode
449 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
450 parent_inode, name, inode)
451 self.inodes[inode].inc_ref()
452 return self.getattr(inode)
454 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
456 raise llfuse.FUSEError(errno.ENOENT)
459 def forget(self, inodes):
460 for inode, nlookup in inodes:
461 ent = self.inodes[inode]
462 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
463 if ent.dec_ref(nlookup) == 0 and ent.dead:
464 self.inodes.del_entry(ent)
467 def open(self, inode, flags):
468 if inode in self.inodes:
469 p = self.inodes[inode]
471 raise llfuse.FUSEError(errno.ENOENT)
473 if isinstance(p, Directory):
474 raise llfuse.FUSEError(errno.EISDIR)
476 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
477 raise llfuse.FUSEError(errno.EPERM)
479 fh = next(self._filehandles_counter)
480 self._filehandles[fh] = FileHandle(fh, p)
485 def read(self, fh, off, size):
486 _logger.debug("arv-mount read %i %i %i", fh, off, size)
487 if fh in self._filehandles:
488 handle = self._filehandles[fh]
490 raise llfuse.FUSEError(errno.EBADF)
492 self.inodes.touch(handle.obj)
494 return handle.obj.readfrom(off, size, self.num_retries)
497 def write(self, fh, off, buf):
498 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
499 if fh in self._filehandles:
500 handle = self._filehandles[fh]
502 raise llfuse.FUSEError(errno.EBADF)
504 if not handle.obj.writable():
505 raise llfuse.FUSEError(errno.EPERM)
507 self.inodes.touch(handle.obj)
509 return handle.obj.writeto(off, buf, self.num_retries)
512 def release(self, fh):
513 if fh in self._filehandles:
515 self._filehandles[fh].flush()
519 self._filehandles[fh].release()
520 del self._filehandles[fh]
521 self.inodes.inode_cache.cap_cache()
523 def releasedir(self, fh):
527 def opendir(self, inode):
528 _logger.debug("arv-mount opendir: inode %i", inode)
530 if inode in self.inodes:
531 p = self.inodes[inode]
533 raise llfuse.FUSEError(errno.ENOENT)
535 if not isinstance(p, Directory):
536 raise llfuse.FUSEError(errno.ENOTDIR)
538 fh = next(self._filehandles_counter)
539 if p.parent_inode in self.inodes:
540 parent = self.inodes[p.parent_inode]
542 raise llfuse.FUSEError(errno.EIO)
547 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
551 def readdir(self, fh, off):
552 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
554 if fh in self._filehandles:
555 handle = self._filehandles[fh]
557 raise llfuse.FUSEError(errno.EBADF)
559 _logger.debug("arv-mount handle.dirobj %s", handle.obj)
562 while e < len(handle.entries):
563 if handle.entries[e][1].inode in self.inodes:
564 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
569 st = llfuse.StatvfsData()
570 st.f_bsize = 128 * 1024
583 def _check_writable(self, inode_parent):
584 if not self.enable_write:
585 raise llfuse.FUSEError(errno.EROFS)
587 if inode_parent in self.inodes:
588 p = self.inodes[inode_parent]
590 raise llfuse.FUSEError(errno.ENOENT)
592 if not isinstance(p, Directory):
593 raise llfuse.FUSEError(errno.ENOTDIR)
596 raise llfuse.FUSEError(errno.EPERM)
601 def create(self, inode_parent, name, mode, flags, ctx):
602 _logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
604 p = self._check_writable(inode_parent)
607 # The file entry should have been implicitly created by callback.
609 fh = next(self._filehandles_counter)
610 self._filehandles[fh] = FileHandle(fh, f)
614 return (fh, self.getattr(f.inode))
617 def mkdir(self, inode_parent, name, mode, ctx):
618 _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
620 p = self._check_writable(inode_parent)
623 # The dir entry should have been implicitly created by callback.
627 return self.getattr(d.inode)
630 def unlink(self, inode_parent, name):
631 _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
632 p = self._check_writable(inode_parent)
636 def rmdir(self, inode_parent, name):
637 _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
638 p = self._check_writable(inode_parent)
642 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
643 _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
644 src = self._check_writable(inode_parent_old)
645 dest = self._check_writable(inode_parent_new)
646 dest.rename(name_old, name_new, src)
650 if fh in self._filehandles:
651 self._filehandles[fh].flush()
653 def fsync(self, fh, datasync):
656 def fsyncdir(self, fh, datasync):