2 # FUSE driver for Arvados Keep
26 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory
27 from fusefile import StreamReaderFile, StringFile
29 _logger = logging.getLogger('arvados.arvados_fuse')
32 class FileHandle(object):
33 """Connects a numeric file handle to a File object that has
34 been opened by the client."""
36 def __init__(self, fh, fileobj):
38 self.fileobj = fileobj
39 self.fileobj.inc_use()
42 self.fileobj.dec_use()
45 class DirectoryHandle(object):
46 """Connects a numeric file handle to a Directory object that has
47 been opened by the client."""
49 def __init__(self, fh, dirobj, entries):
51 self.entries = entries
59 class InodeCache(object):
60 def __init__(self, cap, min_entries=4):
61 self._entries = collections.OrderedDict()
62 self._counter = itertools.count(1)
65 self.min_entries = min_entries
70 def _remove(self, obj, clear):
71 if clear and not obj.clear():
72 _logger.debug("Could not clear %s in_use %s", obj, obj.in_use())
74 self._total -= obj.cache_size
75 del self._entries[obj.cache_priority]
76 _logger.debug("Cleared %s total now %i", obj, self._total)
80 _logger.debug("total is %i cap is %i", self._total, self.cap)
81 if self._total > self.cap:
82 for key in list(self._entries.keys()):
83 if self._total < self.cap or len(self._entries) < self.min_entries:
85 self._remove(self._entries[key], True)
87 def manage(self, obj):
89 obj.cache_priority = next(self._counter)
90 obj.cache_size = obj.objsize()
91 self._entries[obj.cache_priority] = obj
92 self._total += obj.objsize()
93 _logger.debug("Managing %s total now %i", obj, self._total)
98 if obj.cache_priority in self._entries:
99 self._remove(obj, False)
101 _logger.debug("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
103 def unmanage(self, obj):
104 if obj.persisted() and obj.cache_priority in self._entries:
105 self._remove(obj, True)
107 class Inodes(object):
108 """Manage the set of inodes. This is the mapping from a numeric id
109 to a concrete File or Directory object"""
111 def __init__(self, inode_cache):
113 self._counter = itertools.count(llfuse.ROOT_INODE)
114 self.inode_cache = inode_cache
116 def __getitem__(self, item):
117 return self._entries[item]
119 def __setitem__(self, key, item):
120 self._entries[key] = item
123 return self._entries.iterkeys()
126 return self._entries.items()
128 def __contains__(self, k):
129 return k in self._entries
131 def touch(self, entry):
132 entry._atime = time.time()
133 self.inode_cache.touch(entry)
135 def add_entry(self, entry):
136 entry.inode = next(self._counter)
137 self._entries[entry.inode] = entry
138 self.inode_cache.manage(entry)
141 def del_entry(self, entry):
142 self.inode_cache.unmanage(entry)
143 llfuse.invalidate_inode(entry.inode)
144 del self._entries[entry.inode]
147 class Operations(llfuse.Operations):
148 """This is the main interface with llfuse.
150 The methods on this object are called by llfuse threads to service FUSE
151 events to query and read from the file system.
153 llfuse has its own global lock which is acquired before calling a request handler,
154 so request handlers do not run concurrently unless the lock is explicitly released
155 using 'with llfuse.lock_released:'
159 def __init__(self, uid, gid, encoding="utf-8", inode_cache=None):
160 super(Operations, self).__init__()
163 inode_cache = InodeCache(cap=256*1024*1024)
164 self.inodes = Inodes(inode_cache)
167 self.encoding = encoding
169 # dict of inode to filehandle
170 self._filehandles = {}
171 self._filehandles_counter = 1
173 # Other threads that need to wait until the fuse driver
174 # is fully initialized should wait() on this event object.
175 self.initlock = threading.Event()
178 # Allow threads that are waiting for the driver to be finished
179 # initializing to continue
182 def access(self, inode, mode, ctx):
185 def getattr(self, inode):
186 if inode not in self.inodes:
187 raise llfuse.FUSEError(errno.ENOENT)
189 e = self.inodes[inode]
191 entry = llfuse.EntryAttributes()
194 entry.entry_timeout = 300
195 entry.attr_timeout = 300
197 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
198 if isinstance(e, Directory):
199 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
200 elif isinstance(e, StreamReaderFile):
201 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
203 entry.st_mode |= stat.S_IFREG
206 entry.st_uid = self.uid
207 entry.st_gid = self.gid
210 entry.st_size = e.size()
212 entry.st_blksize = 512
213 entry.st_blocks = (e.size()/512)+1
214 entry.st_atime = int(e.atime())
215 entry.st_mtime = int(e.mtime())
216 entry.st_ctime = int(e.mtime())
220 def lookup(self, parent_inode, name):
221 name = unicode(name, self.encoding)
222 _logger.debug("arv-mount lookup: parent_inode %i name %s",
229 if parent_inode in self.inodes:
230 p = self.inodes[parent_inode]
232 inode = p.parent_inode
233 elif isinstance(p, Directory) and name in p:
234 inode = p[name].inode
237 return self.getattr(inode)
239 raise llfuse.FUSEError(errno.ENOENT)
241 def open(self, inode, flags):
242 if inode in self.inodes:
243 p = self.inodes[inode]
245 raise llfuse.FUSEError(errno.ENOENT)
247 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
248 raise llfuse.FUSEError(errno.EROFS)
250 if isinstance(p, Directory):
251 raise llfuse.FUSEError(errno.EISDIR)
253 fh = self._filehandles_counter
254 self._filehandles_counter += 1
255 self._filehandles[fh] = FileHandle(fh, p)
259 def read(self, fh, off, size):
260 _logger.debug("arv-mount read %i %i %i", fh, off, size)
261 if fh in self._filehandles:
262 handle = self._filehandles[fh]
264 raise llfuse.FUSEError(errno.EBADF)
266 self.inodes.touch(handle.fileobj)
269 with llfuse.lock_released:
270 return handle.fileobj.readfrom(off, size)
271 except arvados.errors.NotFoundError as e:
272 _logger.warning("Block not found: " + str(e))
273 raise llfuse.FUSEError(errno.EIO)
276 raise llfuse.FUSEError(errno.EIO)
278 def release(self, fh):
279 if fh in self._filehandles:
280 self._filehandles[fh].release()
281 del self._filehandles[fh]
282 self.inodes.inode_cache.cap_cache()
284 def releasedir(self, fh):
287 def opendir(self, inode):
288 _logger.debug("arv-mount opendir: inode %i", inode)
290 if inode in self.inodes:
291 p = self.inodes[inode]
293 raise llfuse.FUSEError(errno.ENOENT)
295 if not isinstance(p, Directory):
296 raise llfuse.FUSEError(errno.ENOTDIR)
298 fh = self._filehandles_counter
299 self._filehandles_counter += 1
300 if p.parent_inode in self.inodes:
301 parent = self.inodes[p.parent_inode]
303 raise llfuse.FUSEError(errno.EIO)
308 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
312 def readdir(self, fh, off):
313 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
315 if fh in self._filehandles:
316 handle = self._filehandles[fh]
318 raise llfuse.FUSEError(errno.EBADF)
320 _logger.debug("arv-mount handle.dirobj %s", handle.dirobj)
323 while e < len(handle.entries):
324 if handle.entries[e][1].inode in self.inodes:
326 yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
327 except UnicodeEncodeError:
332 st = llfuse.StatvfsData()
333 st.f_bsize = 64 * 1024
346 # The llfuse documentation recommends only overloading functions that
347 # are actually implemented, as the default implementation will raise ENOSYS.
348 # However, there is a bug in the llfuse default implementation of create()
349 # "create() takes exactly 5 positional arguments (6 given)" which will crash
351 # The workaround is to implement it with the proper number of parameters,
352 # and then everything works out.
353 def create(self, inode_parent, name, mode, flags, ctx):
354 raise llfuse.FUSEError(errno.EROFS)