X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/249e9001502b26d875c34c9e7d12b4a27e367c14..d8d82841f43394b3781804844d4860bc8205d5fc:/services/fuse/arvados_fuse/__init__.py?ds=inline diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py index a886e37f62..deb0cd35a3 100644 --- a/services/fuse/arvados_fuse/__init__.py +++ b/services/fuse/arvados_fuse/__init__.py @@ -21,6 +21,7 @@ import calendar import threading import itertools import ciso8601 +import collections from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory from fusefile import StreamReaderFile @@ -29,21 +30,71 @@ _logger = logging.getLogger('arvados.arvados_fuse') class FileHandle(object): - """Connects a numeric file handle to a File or Directory object that has + """Connects a numeric file handle to a File object that has been opened by the client.""" - def __init__(self, fh, entry): + def __init__(self, fh, fileobj): self.fh = fh - self.entry = entry + self.fileobj = fileobj + self.fileobj.inc_use() + + def release(self): + self.fileobj.dec_use() + + +class DirectoryHandle(object): + """Connects a numeric file handle to a Directory object that has + been opened by the client.""" + + def __init__(self, fh, dirobj, entries): + self.fh = fh + self.entries = entries + self.dirobj = dirobj + self.dirobj.inc_use() + + def release(self): + self.dirobj.dec_use() + + +class ObjectCache(object): + def __init__(self, cap): + self._entries = collections.OrderedDict() + self._counter = itertools.count(1) + self.cap = cap + + def cap_cache(self): + if len(self._entries) > self.cap: + ent = iter(self._entries) + ents = [next(ent) for i in xrange(0, len(self._entries) - self.cap)] + for key in ents: + capobj = self._entries[key] + if capobj.clear(): + del self._entries[key] + + def manage(self, obj): + obj._cache_priority = next(self._counter) + self._entries[obj._cache_priority] = obj + self.cap_cache() + + def touch(self, obj): + if obj._cache_priority in self._entries: + del self._entries[obj._cache_priority] + self.manage(obj) + + def unmanage(self, obj): + if obj._cache_priority in self._entries: + if obj.clear(): + del self._entries[obj._cache_priority] class Inodes(object): """Manage the set of inodes. This is the mapping from a numeric id to a concrete File or Directory object""" - def __init__(self): + def __init__(self, cache_cap=1000): self._entries = {} self._counter = itertools.count(llfuse.ROOT_INODE) + self._obj_cache = ObjectCache(cap=cache_cap) def __getitem__(self, item): return self._entries[item] @@ -60,12 +111,20 @@ class Inodes(object): def __contains__(self, k): return k in self._entries + def touch(self, entry): + self._obj_cache.touch(entry) + + def cap_cache(self): + self._obj_cache.cap_cache() + def add_entry(self, entry): entry.inode = next(self._counter) self._entries[entry.inode] = entry + self._obj_cache.manage(entry) return entry def del_entry(self, entry): + self._obj_cache.unmanage(entry) llfuse.invalidate_inode(entry.inode) del self._entries[entry.inode] @@ -82,10 +141,10 @@ class Operations(llfuse.Operations): """ - def __init__(self, uid, gid, encoding="utf-8"): + def __init__(self, uid, gid, encoding="utf-8", cache_cap=1000): super(Operations, self).__init__() - self.inodes = Inodes() + self.inodes = Inodes(cache_cap) self.uid = uid self.gid = gid self.encoding = encoding @@ -187,11 +246,11 @@ class Operations(llfuse.Operations): raise llfuse.FUSEError(errno.EBADF) # update atime - handle.entry._atime = time.time() + handle.fileobj._atime = time.time() try: with llfuse.lock_released: - return handle.entry.readfrom(off, size) + return handle.fileobj.readfrom(off, size) except arvados.errors.NotFoundError as e: _logger.warning("Block not found: " + str(e)) raise llfuse.FUSEError(errno.EIO) @@ -201,7 +260,12 @@ class Operations(llfuse.Operations): def release(self, fh): if fh in self._filehandles: + self._filehandles[fh].release() del self._filehandles[fh] + self.inodes.cap_cache() + + def releasedir(self, fh): + self.release(fh) def opendir(self, inode): _logger.debug("arv-mount opendir: inode %i", inode) @@ -224,8 +288,13 @@ class Operations(llfuse.Operations): # update atime p._atime = time.time() - self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items())) - return fh + try: + p.inc_use() + self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items())) + return fh + finally: + p.dec_use() + def readdir(self, fh, off): _logger.debug("arv-mount readdir: fh %i off %i", fh, off) @@ -235,20 +304,17 @@ class Operations(llfuse.Operations): else: raise llfuse.FUSEError(errno.EBADF) - _logger.debug("arv-mount handle.entry %s", handle.entry) + _logger.debug("arv-mount handle.dirobj %s", handle.dirobj) e = off - while e < len(handle.entry): - if handle.entry[e][1].inode in self.inodes: + while e < len(handle.entries): + if handle.entries[e][1].inode in self.inodes: try: - yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1) + yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1) except UnicodeEncodeError: pass e += 1 - def releasedir(self, fh): - del self._filehandles[fh] - def statfs(self): st = llfuse.StatvfsData() st.f_bsize = 64 * 1024