1 """FUSE driver for Arvados Keep
5 There is one `Operations` object per mount point. It is the entry point for all
6 read and write requests from the llfuse module.
8 The operations object owns an `Inodes` object. The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
12 The `Inodes` object owns an `InodeCache` object. The inode cache records the
13 memory footprint of file system objects and when they are last used. When the
14 cache limit is exceeded, the least recently used objects are cleared.
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
18 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
21 Directory objects inherit from `fusedir.Directory`. The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`. These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
28 The general FUSE operation flow is as follows:
30 - The request handler is called with either an inode or file handle that is the
31 subject of the operation.
33 - Look up the inode using the Inodes table or the file handle in the
34 filehandles table to get the file system object.
36 - For methods that alter files or directories, check that the operation is
37 valid and permitted using _check_writable().
39 - Call the relevant method on the file system object.
43 The FUSE driver supports the Arvados event bus. When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
73 # Default _notify_queue has a limit of 1000 items, but it really needs to be
74 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
77 llfuse.capi._notify_queue = Queue.Queue()
79 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
80 from fusefile import StringFile, FuseArvadosFile
82 _logger = logging.getLogger('arvados.arvados_fuse')
84 # Uncomment this to enable llfuse debug logging.
85 # log_handler = logging.StreamHandler()
86 # llogger = logging.getLogger('llfuse')
87 # llogger.addHandler(log_handler)
88 # llogger.setLevel(logging.DEBUG)
91 """Connects a numeric file handle to a File or Directory object that has
92 been opened by the client."""
94 def __init__(self, fh, obj):
103 if self.obj.writable():
104 return self.obj.flush()
107 class FileHandle(Handle):
108 """Connects a numeric file handle to a File object that has
109 been opened by the client."""
113 class DirectoryHandle(Handle):
114 """Connects a numeric file handle to a Directory object that has
115 been opened by the client."""
117 def __init__(self, fh, dirobj, entries):
118 super(DirectoryHandle, self).__init__(fh, dirobj)
119 self.entries = entries
122 class InodeCache(object):
123 """Records the memory footprint of objects and when they are last used.
125 When the cache limit is exceeded, the least recently used objects are
126 cleared. Clearing the object means discarding its contents to release
127 memory. The next time the object is accessed, it must be re-fetched from
128 the server. Note that the inode cache limit is a soft limit; the cache
129 limit may be exceeded if necessary to load very large objects, it may also
130 be exceeded if open file handles prevent objects from being cleared.
134 def __init__(self, cap, min_entries=4):
135 self._entries = collections.OrderedDict()
137 self._counter = itertools.count(0)
140 self.min_entries = min_entries
145 def _remove(self, obj, clear):
146 if clear and not obj.clear():
147 _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
149 self._total -= obj.cache_size
150 del self._entries[obj.cache_priority]
152 self._by_uuid[obj.cache_uuid].remove(obj)
153 if not self._by_uuid[obj.cache_uuid]:
154 del self._by_uuid[obj.cache_uuid]
155 obj.cache_uuid = None
157 _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
161 if self._total > self.cap:
162 for key in list(self._entries.keys()):
163 if self._total < self.cap or len(self._entries) < self.min_entries:
165 self._remove(self._entries[key], True)
167 def manage(self, obj):
169 obj.cache_priority = next(self._counter)
170 obj.cache_size = obj.objsize()
171 self._entries[obj.cache_priority] = obj
172 obj.cache_uuid = obj.uuid()
174 if obj.cache_uuid not in self._by_uuid:
175 self._by_uuid[obj.cache_uuid] = [obj]
177 if obj not in self._by_uuid[obj.cache_uuid]:
178 self._by_uuid[obj.cache_uuid].append(obj)
179 self._total += obj.objsize()
180 _logger.debug("InodeCache touched %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
183 obj.cache_priority = None
185 def touch(self, obj):
187 if obj.cache_priority in self._entries:
188 self._remove(obj, False)
191 def unmanage(self, obj):
192 if obj.persisted() and obj.cache_priority in self._entries:
193 self._remove(obj, True)
195 def find_by_uuid(self, uuid):
196 return self._by_uuid.get(uuid, [])
199 self._entries.clear()
200 self._by_uuid.clear()
203 class Inodes(object):
204 """Manage the set of inodes. This is the mapping from a numeric id
205 to a concrete File or Directory object"""
207 def __init__(self, inode_cache, encoding="utf-8"):
209 self._counter = itertools.count(llfuse.ROOT_INODE)
210 self.inode_cache = inode_cache
211 self.encoding = encoding
212 self.deferred_invalidations = []
214 def __getitem__(self, item):
215 return self._entries[item]
217 def __setitem__(self, key, item):
218 self._entries[key] = item
221 return self._entries.iterkeys()
224 return self._entries.items()
226 def __contains__(self, k):
227 return k in self._entries
229 def touch(self, entry):
230 entry._atime = time.time()
231 self.inode_cache.touch(entry)
233 def add_entry(self, entry):
234 entry.inode = next(self._counter)
235 if entry.inode == llfuse.ROOT_INODE:
237 self._entries[entry.inode] = entry
238 self.inode_cache.manage(entry)
241 def del_entry(self, entry):
242 if entry.ref_count == 0:
243 self.inode_cache.unmanage(entry)
244 del self._entries[entry.inode]
245 with llfuse.lock_released:
247 self.invalidate_inode(entry.inode)
251 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
253 def invalidate_inode(self, inode):
254 llfuse.invalidate_inode(inode)
256 def invalidate_entry(self, inode, name):
257 llfuse.invalidate_entry(inode, name)
260 self.inode_cache.clear()
262 for k,v in self._entries.items():
265 except Exception as e:
266 _logger.exception("Error during finalize of inode %i", k)
268 self._entries.clear()
271 def catch_exceptions(orig_func):
272 """Catch uncaught exceptions and log them consistently."""
274 @functools.wraps(orig_func)
275 def catch_exceptions_wrapper(self, *args, **kwargs):
277 return orig_func(self, *args, **kwargs)
278 except llfuse.FUSEError:
280 except EnvironmentError as e:
281 raise llfuse.FUSEError(e.errno)
282 except arvados.errors.KeepWriteError as e:
283 _logger.error("Keep write error: " + str(e))
284 raise llfuse.FUSEError(errno.EIO)
285 except arvados.errors.NotFoundError as e:
286 _logger.error("Block not found error: " + str(e))
287 raise llfuse.FUSEError(errno.EIO)
289 _logger.exception("Unhandled exception during FUSE operation")
290 raise llfuse.FUSEError(errno.EIO)
292 return catch_exceptions_wrapper
295 class Operations(llfuse.Operations):
296 """This is the main interface with llfuse.
298 The methods on this object are called by llfuse threads to service FUSE
299 events to query and read from the file system.
301 llfuse has its own global lock which is acquired before calling a request handler,
302 so request handlers do not run concurrently unless the lock is explicitly released
303 using 'with llfuse.lock_released:'
307 def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
308 super(Operations, self).__init__()
310 self._api_client = api_client
313 inode_cache = InodeCache(cap=256*1024*1024)
314 self.inodes = Inodes(inode_cache, encoding=encoding)
317 self.enable_write = enable_write
319 # dict of inode to filehandle
320 self._filehandles = {}
321 self._filehandles_counter = itertools.count(0)
323 # Other threads that need to wait until the fuse driver
324 # is fully initialized should wait() on this event object.
325 self.initlock = threading.Event()
327 # If we get overlapping shutdown events (e.g., fusermount -u
328 # -z and operations.destroy()) llfuse calls forget() on inodes
329 # that have already been deleted. To avoid this, we make
330 # forget() a no-op if called after destroy().
331 self._shutdown_started = threading.Event()
333 self.num_retries = num_retries
335 self.read_counter = arvados.keep.Counter()
336 self.write_counter = arvados.keep.Counter()
337 self.read_ops_counter = arvados.keep.Counter()
338 self.write_ops_counter = arvados.keep.Counter()
343 # Allow threads that are waiting for the driver to be finished
344 # initializing to continue
350 self._shutdown_started.set()
357 def access(self, inode, mode, ctx):
360 def listen_for_events(self):
361 self.events = arvados.events.subscribe(self._api_client,
362 [["event_type", "in", ["create", "update", "delete"]]],
366 def on_event(self, ev):
367 if 'event_type' not in ev:
370 for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
372 if ev["object_kind"] == "arvados#collection":
373 new_attr = (ev.get("properties") and
374 ev["properties"].get("new_attributes") and
375 ev["properties"]["new_attributes"])
377 # new_attributes.modified_at currently lacks
378 # subsecond precision (see #6347) so use event_at
379 # which should always be the same.
381 (ev["event_at"], new_attr["portable_data_hash"])
382 if new_attr else None)
384 item.update(to_record_version=record_version)
389 ev.get("properties") and
390 ev["properties"].get("old_attributes") and
391 ev["properties"]["old_attributes"].get("owner_uuid"))
392 newowner = ev["object_owner_uuid"]
394 self.inodes.inode_cache.find_by_uuid(oldowner) +
395 self.inodes.inode_cache.find_by_uuid(newowner)):
401 def getattr(self, inode):
402 if inode not in self.inodes:
403 raise llfuse.FUSEError(errno.ENOENT)
405 e = self.inodes[inode]
407 entry = llfuse.EntryAttributes()
410 entry.entry_timeout = 60 if e.allow_dirent_cache else 0
411 entry.attr_timeout = 60 if e.allow_attr_cache else 0
413 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
414 if isinstance(e, Directory):
415 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
417 entry.st_mode |= stat.S_IFREG
418 if isinstance(e, FuseArvadosFile):
419 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
421 if self.enable_write and e.writable():
422 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
425 entry.st_uid = self.uid
426 entry.st_gid = self.gid
429 entry.st_size = e.size()
431 entry.st_blksize = 512
432 entry.st_blocks = (entry.st_size/512)+1
433 entry.st_atime = int(e.atime())
434 entry.st_mtime = int(e.mtime())
435 entry.st_ctime = int(e.mtime())
440 def setattr(self, inode, attr):
441 entry = self.getattr(inode)
443 e = self.inodes[inode]
445 if attr.st_size is not None and isinstance(e, FuseArvadosFile):
446 with llfuse.lock_released:
447 e.arvfile.truncate(attr.st_size)
448 entry.st_size = e.arvfile.size()
453 def lookup(self, parent_inode, name):
454 name = unicode(name, self.inodes.encoding)
460 if parent_inode in self.inodes:
461 p = self.inodes[parent_inode]
464 inode = p.parent_inode
465 elif isinstance(p, Directory) and name in p:
466 inode = p[name].inode
469 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
470 parent_inode, name, inode)
471 self.inodes[inode].inc_ref()
472 return self.getattr(inode)
474 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
476 raise llfuse.FUSEError(errno.ENOENT)
479 def forget(self, inodes):
480 if self._shutdown_started.is_set():
482 for inode, nlookup in inodes:
483 ent = self.inodes[inode]
484 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
485 if ent.dec_ref(nlookup) == 0 and ent.dead:
486 self.inodes.del_entry(ent)
489 def open(self, inode, flags):
490 if inode in self.inodes:
491 p = self.inodes[inode]
493 raise llfuse.FUSEError(errno.ENOENT)
495 if isinstance(p, Directory):
496 raise llfuse.FUSEError(errno.EISDIR)
498 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
499 raise llfuse.FUSEError(errno.EPERM)
501 fh = next(self._filehandles_counter)
502 self._filehandles[fh] = FileHandle(fh, p)
505 _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
510 def read(self, fh, off, size):
511 _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
512 self.read_ops_counter.add(1)
514 if fh in self._filehandles:
515 handle = self._filehandles[fh]
517 raise llfuse.FUSEError(errno.EBADF)
519 self.inodes.touch(handle.obj)
521 r = handle.obj.readfrom(off, size, self.num_retries)
523 self.read_counter.add(len(r))
527 def write(self, fh, off, buf):
528 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
529 self.write_ops_counter.add(1)
531 if fh in self._filehandles:
532 handle = self._filehandles[fh]
534 raise llfuse.FUSEError(errno.EBADF)
536 if not handle.obj.writable():
537 raise llfuse.FUSEError(errno.EPERM)
539 self.inodes.touch(handle.obj)
541 w = handle.obj.writeto(off, buf, self.num_retries)
543 self.write_counter.add(w)
547 def release(self, fh):
548 if fh in self._filehandles:
550 self._filehandles[fh].flush()
554 self._filehandles[fh].release()
555 del self._filehandles[fh]
556 self.inodes.inode_cache.cap_cache()
558 def releasedir(self, fh):
562 def opendir(self, inode):
563 _logger.debug("arv-mount opendir: inode %i", inode)
565 if inode in self.inodes:
566 p = self.inodes[inode]
568 raise llfuse.FUSEError(errno.ENOENT)
570 if not isinstance(p, Directory):
571 raise llfuse.FUSEError(errno.ENOTDIR)
573 fh = next(self._filehandles_counter)
574 if p.parent_inode in self.inodes:
575 parent = self.inodes[p.parent_inode]
577 raise llfuse.FUSEError(errno.EIO)
582 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
586 def readdir(self, fh, off):
587 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
589 if fh in self._filehandles:
590 handle = self._filehandles[fh]
592 raise llfuse.FUSEError(errno.EBADF)
595 while e < len(handle.entries):
596 if handle.entries[e][1].inode in self.inodes:
597 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
602 st = llfuse.StatvfsData()
603 st.f_bsize = 128 * 1024
616 def _check_writable(self, inode_parent):
617 if not self.enable_write:
618 raise llfuse.FUSEError(errno.EROFS)
620 if inode_parent in self.inodes:
621 p = self.inodes[inode_parent]
623 raise llfuse.FUSEError(errno.ENOENT)
625 if not isinstance(p, Directory):
626 raise llfuse.FUSEError(errno.ENOTDIR)
629 raise llfuse.FUSEError(errno.EPERM)
634 def create(self, inode_parent, name, mode, flags, ctx):
635 _logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
637 p = self._check_writable(inode_parent)
640 # The file entry should have been implicitly created by callback.
642 fh = next(self._filehandles_counter)
643 self._filehandles[fh] = FileHandle(fh, f)
647 return (fh, self.getattr(f.inode))
650 def mkdir(self, inode_parent, name, mode, ctx):
651 _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
653 p = self._check_writable(inode_parent)
656 # The dir entry should have been implicitly created by callback.
660 return self.getattr(d.inode)
663 def unlink(self, inode_parent, name):
664 _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
665 p = self._check_writable(inode_parent)
669 def rmdir(self, inode_parent, name):
670 _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
671 p = self._check_writable(inode_parent)
675 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
676 _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
677 src = self._check_writable(inode_parent_old)
678 dest = self._check_writable(inode_parent_new)
679 dest.rename(name_old, name_new, src)
683 if fh in self._filehandles:
684 self._filehandles[fh].flush()
686 def fsync(self, fh, datasync):
689 def fsyncdir(self, fh, datasync):