1 """FUSE driver for Arvados Keep
5 There is one `Operations` object per mount point. It is the entry point for all
6 read and write requests from the llfuse module.
8 The operations object owns an `Inodes` object. The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
12 The `Inodes` object owns an `InodeCache` object. The inode cache records the
13 memory footprint of file system objects and when they are last used. When the
14 cache limit is exceeded, the least recently used objects are cleared.
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
18 File objects inherit from `fusefile.File`. Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
21 Directory objects inherit from `fusedir.Directory`. The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`. These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
28 The general FUSE operation flow is as follows:
30 - The request handler is called with either an inode or file handle that is the
31 subject of the operation.
33 - Look up the inode using the Inodes table or the file handle in the
34 filehandles table to get the file system object.
36 - For methods that alter files or directories, check that the operation is
37 valid and permitted using _check_writable().
39 - Call the relevant method on the file system object.
43 The FUSE driver supports the Arvados event bus. When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
70 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
71 from fusefile import StringFile, FuseArvadosFile
73 _logger = logging.getLogger('arvados.arvados_fuse')
75 log_handler = logging.StreamHandler()
76 llogger = logging.getLogger('llfuse')
77 llogger.addHandler(log_handler)
78 llogger.setLevel(logging.DEBUG)
81 """Connects a numeric file handle to a File or Directory object that has
82 been opened by the client."""
84 def __init__(self, fh, obj):
93 return self.obj.flush()
96 class FileHandle(Handle):
97 """Connects a numeric file handle to a File object that has
98 been opened by the client."""
102 class DirectoryHandle(Handle):
103 """Connects a numeric file handle to a Directory object that has
104 been opened by the client."""
106 def __init__(self, fh, dirobj, entries):
107 super(DirectoryHandle, self).__init__(fh, dirobj)
108 self.entries = entries
111 class InodeCache(object):
112 """Records the memory footprint of objects and when they are last used.
114 When the cache limit is exceeded, the least recently used objects are
115 cleared. Clearing the object means discarding its contents to release
116 memory. The next time the object is accessed, it must be re-fetched from
117 the server. Note that the inode cache limit is a soft limit; the cache
118 limit may be exceeded if necessary to load very large objects, it may also
119 be exceeded if open file handles prevent objects from being cleared.
123 def __init__(self, cap, min_entries=4):
124 self._entries = collections.OrderedDict()
126 self._counter = itertools.count(0)
129 self.min_entries = min_entries
134 def _remove(self, obj, clear):
135 if clear and not obj.clear():
136 _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
138 self._total -= obj.cache_size
139 del self._entries[obj.cache_priority]
141 del self._by_uuid[obj.cache_uuid]
142 obj.cache_uuid = None
144 _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
148 #_logger.debug("InodeCache total is %i cap is %i", self._total, self.cap)
149 if self._total > self.cap:
150 for key in list(self._entries.keys()):
151 if self._total < self.cap or len(self._entries) < self.min_entries:
153 self._remove(self._entries[key], True)
155 def manage(self, obj):
157 obj.cache_priority = next(self._counter)
158 obj.cache_size = obj.objsize()
159 self._entries[obj.cache_priority] = obj
160 obj.cache_uuid = obj.uuid()
162 self._by_uuid[obj.cache_uuid] = obj
163 self._total += obj.objsize()
164 _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
167 obj.cache_priority = None
169 def touch(self, obj):
171 if obj.cache_priority in self._entries:
172 self._remove(obj, False)
175 def unmanage(self, obj):
176 if obj.persisted() and obj.cache_priority in self._entries:
177 self._remove(obj, True)
179 def find(self, uuid):
180 return self._by_uuid.get(uuid)
182 class Inodes(object):
183 """Manage the set of inodes. This is the mapping from a numeric id
184 to a concrete File or Directory object"""
186 def __init__(self, inode_cache):
188 self._counter = itertools.count(llfuse.ROOT_INODE)
189 self.inode_cache = inode_cache
191 def __getitem__(self, item):
192 return self._entries[item]
194 def __setitem__(self, key, item):
195 self._entries[key] = item
198 return self._entries.iterkeys()
201 return self._entries.items()
203 def __contains__(self, k):
204 return k in self._entries
206 def touch(self, entry):
207 entry._atime = time.time()
208 self.inode_cache.touch(entry)
210 def add_entry(self, entry):
211 entry.inode = next(self._counter)
212 if entry.inode == llfuse.ROOT_INODE:
214 self._entries[entry.inode] = entry
215 self.inode_cache.manage(entry)
218 def del_entry(self, entry):
219 if entry.ref_count == 0:
220 _logger.debug("Deleting inode %i", entry.inode)
221 self.inode_cache.unmanage(entry)
222 llfuse.invalidate_inode(entry.inode)
223 del self._entries[entry.inode]
227 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
230 def catch_exceptions(orig_func):
231 """Catch uncaught exceptions and log them consistently."""
233 @functools.wraps(orig_func)
234 def catch_exceptions_wrapper(self, *args, **kwargs):
236 return orig_func(self, *args, **kwargs)
237 except llfuse.FUSEError:
239 except EnvironmentError as e:
240 raise llfuse.FUSEError(e.errno)
242 _logger.exception("Unhandled exception during FUSE operation")
243 raise llfuse.FUSEError(errno.EIO)
245 return catch_exceptions_wrapper
248 class Operations(llfuse.Operations):
249 """This is the main interface with llfuse.
251 The methods on this object are called by llfuse threads to service FUSE
252 events to query and read from the file system.
254 llfuse has its own global lock which is acquired before calling a request handler,
255 so request handlers do not run concurrently unless the lock is explicitly released
256 using 'with llfuse.lock_released:'
260 def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4):
261 super(Operations, self).__init__()
264 inode_cache = InodeCache(cap=256*1024*1024)
265 self.inodes = Inodes(inode_cache)
268 self.encoding = encoding
270 # dict of inode to filehandle
271 self._filehandles = {}
272 self._filehandles_counter = itertools.count(0)
274 # Other threads that need to wait until the fuse driver
275 # is fully initialized should wait() on this event object.
276 self.initlock = threading.Event()
278 self.num_retries = num_retries
283 # Allow threads that are waiting for the driver to be finished
284 # initializing to continue
291 def access(self, inode, mode, ctx):
294 def listen_for_events(self, api_client):
295 self.event = arvados.events.subscribe(api_client,
296 [["event_type", "in", ["create", "update", "delete"]]],
299 def on_event(self, ev):
300 if 'event_type' in ev:
302 item = self.inodes.inode_cache.find(ev["object_uuid"])
305 if ev["object_kind"] == "arvados#collection":
306 item.update(to_pdh=ev.get("properties", {}).get("new_attributes", {}).get("portable_data_hash"))
310 oldowner = ev.get("properties", {}).get("old_attributes", {}).get("owner_uuid")
311 olditemparent = self.inodes.inode_cache.find(oldowner)
312 if olditemparent is not None:
313 olditemparent.invalidate()
314 olditemparent.update()
316 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
317 if itemparent is not None:
318 itemparent.invalidate()
322 def getattr(self, inode):
323 if inode not in self.inodes:
324 raise llfuse.FUSEError(errno.ENOENT)
326 e = self.inodes[inode]
328 entry = llfuse.EntryAttributes()
331 entry.entry_timeout = 300
332 entry.attr_timeout = 300
334 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
335 if isinstance(e, Directory):
336 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
338 entry.st_mode |= stat.S_IFREG
339 if isinstance(e, FuseArvadosFile):
340 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
343 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
346 entry.st_uid = self.uid
347 entry.st_gid = self.gid
350 entry.st_size = e.size()
352 entry.st_blksize = 512
353 entry.st_blocks = (e.size()/512)+1
354 entry.st_atime = int(e.atime())
355 entry.st_mtime = int(e.mtime())
356 entry.st_ctime = int(e.mtime())
361 def setattr(self, inode, attr):
362 entry = self.getattr(inode)
364 e = self.inodes[inode]
366 if attr.st_size is not None and isinstance(e, FuseArvadosFile):
367 with llfuse.lock_released:
368 e.arvfile.truncate(attr.st_size)
369 entry.st_size = e.arvfile.size()
374 def lookup(self, parent_inode, name):
375 name = unicode(name, self.encoding)
381 if parent_inode in self.inodes:
382 p = self.inodes[parent_inode]
384 inode = p.parent_inode
385 elif isinstance(p, Directory) and name in p:
386 inode = p[name].inode
389 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
390 parent_inode, name, inode)
391 self.inodes[inode].inc_ref()
392 return self.getattr(inode)
394 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
396 raise llfuse.FUSEError(errno.ENOENT)
399 def forget(self, inodes):
400 for inode, nlookup in inodes:
401 ent = self.inodes[inode]
402 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
403 if ent.dec_ref(nlookup) == 0 and ent.dead:
404 self.inodes.del_entry(ent)
407 def open(self, inode, flags):
408 if inode in self.inodes:
409 p = self.inodes[inode]
411 raise llfuse.FUSEError(errno.ENOENT)
413 if isinstance(p, Directory):
414 raise llfuse.FUSEError(errno.EISDIR)
416 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
417 raise llfuse.FUSEError(errno.EPERM)
419 fh = next(self._filehandles_counter)
420 self._filehandles[fh] = FileHandle(fh, p)
425 def read(self, fh, off, size):
426 _logger.debug("arv-mount read %i %i %i", fh, off, size)
427 if fh in self._filehandles:
428 handle = self._filehandles[fh]
430 raise llfuse.FUSEError(errno.EBADF)
432 self.inodes.touch(handle.obj)
435 return handle.obj.readfrom(off, size, self.num_retries)
436 except arvados.errors.NotFoundError as e:
437 _logger.error("Block not found: " + str(e))
438 raise llfuse.FUSEError(errno.EIO)
441 def write(self, fh, off, buf):
442 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
443 if fh in self._filehandles:
444 handle = self._filehandles[fh]
446 raise llfuse.FUSEError(errno.EBADF)
448 if not handle.obj.writable():
449 raise llfuse.FUSEError(errno.EPERM)
451 self.inodes.touch(handle.obj)
453 return handle.obj.writeto(off, buf, self.num_retries)
456 def release(self, fh):
457 if fh in self._filehandles:
459 self._filehandles[fh].flush()
460 except EnvironmentError as e:
461 raise llfuse.FUSEError(e.errno)
463 _logger.exception("Flush error")
464 self._filehandles[fh].release()
465 del self._filehandles[fh]
466 self.inodes.inode_cache.cap_cache()
468 def releasedir(self, fh):
472 def opendir(self, inode):
473 _logger.debug("arv-mount opendir: inode %i", inode)
475 if inode in self.inodes:
476 p = self.inodes[inode]
478 raise llfuse.FUSEError(errno.ENOENT)
480 if not isinstance(p, Directory):
481 raise llfuse.FUSEError(errno.ENOTDIR)
483 fh = next(self._filehandles_counter)
484 if p.parent_inode in self.inodes:
485 parent = self.inodes[p.parent_inode]
487 raise llfuse.FUSEError(errno.EIO)
492 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
496 def readdir(self, fh, off):
497 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
499 if fh in self._filehandles:
500 handle = self._filehandles[fh]
502 raise llfuse.FUSEError(errno.EBADF)
504 _logger.debug("arv-mount handle.dirobj %s", handle.obj)
507 while e < len(handle.entries):
508 if handle.entries[e][1].inode in self.inodes:
510 yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
511 except UnicodeEncodeError:
517 st = llfuse.StatvfsData()
518 st.f_bsize = 128 * 1024
531 def _check_writable(self, inode_parent):
532 if inode_parent in self.inodes:
533 p = self.inodes[inode_parent]
535 raise llfuse.FUSEError(errno.ENOENT)
537 if not isinstance(p, Directory):
538 raise llfuse.FUSEError(errno.ENOTDIR)
541 raise llfuse.FUSEError(errno.EPERM)
546 def create(self, inode_parent, name, mode, flags, ctx):
547 p = self._check_writable(inode_parent)
550 # The file entry should have been implicitly created by callback.
552 fh = next(self._filehandles_counter)
553 self._filehandles[fh] = FileHandle(fh, f)
557 return (fh, self.getattr(f.inode))
560 def mkdir(self, inode_parent, name, mode, ctx):
561 _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
563 p = self._check_writable(inode_parent)
566 # The dir entry should have been implicitly created by callback.
570 return self.getattr(d.inode)
573 def unlink(self, inode_parent, name):
574 _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
575 p = self._check_writable(inode_parent)
579 def rmdir(self, inode_parent, name):
580 _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
581 p = self._check_writable(inode_parent)
585 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
586 _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
587 src = self._check_writable(inode_parent_old)
588 dest = self._check_writable(inode_parent_new)
589 dest.rename(name_old, name_new, src)
593 if fh in self._filehandles:
594 self._filehandles[fh].flush()
596 def fsync(self, fh, datasync):
599 def fsyncdir(self, fh, datasync):