2 # FUSE driver for Arvados Keep
27 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
28 from fusefile import StringFile, FuseArvadosFile
30 _logger = logging.getLogger('arvados.arvados_fuse')
32 log_handler = logging.StreamHandler()
33 llogger = logging.getLogger('llfuse')
34 llogger.addHandler(log_handler)
35 llogger.setLevel(logging.DEBUG)
38 """Connects a numeric file handle to a File or Directory object that has
39 been opened by the client."""
41 def __init__(self, fh, obj):
50 with llfuse.lock_released:
51 return self.obj.flush()
54 class FileHandle(Handle):
55 """Connects a numeric file handle to a File object that has
56 been opened by the client."""
60 class DirectoryHandle(Handle):
61 """Connects a numeric file handle to a Directory object that has
62 been opened by the client."""
64 def __init__(self, fh, dirobj, entries):
65 super(DirectoryHandle, self).__init__(fh, dirobj)
66 self.entries = entries
69 class InodeCache(object):
70 def __init__(self, cap):
71 self._entries = collections.OrderedDict()
72 self._counter = itertools.count(1)
76 def _remove(self, obj, clear):
77 if clear and not obj.clear():
78 _logger.debug("Could not clear %s in_use %s", obj, obj.in_use())
80 self._total -= obj._cache_size
81 del self._entries[obj._cache_priority]
82 _logger.debug("Cleared %s total now %i", obj, self._total)
86 _logger.debug("total is %i cap is %i", self._total, self.cap)
87 if self._total > self.cap:
89 for key in list(self._entries.keys()):
90 if self._total < self.cap or len(self._entries) < 4:
92 self._remove(self._entries[key], True)
95 def manage(self, obj):
97 obj._cache_priority = next(self._counter)
98 obj._cache_size = obj.objsize()
99 self._entries[obj._cache_priority] = obj
100 self._total += obj.objsize()
101 _logger.debug("Managing %s total now %i", obj, self._total)
104 def touch(self, obj):
106 if obj._cache_priority in self._entries:
107 self._remove(obj, False)
109 _logger.debug("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
111 def unmanage(self, obj):
112 if obj.persisted() and obj._cache_priority in self._entries:
113 self._remove(obj, True)
116 class Inodes(object):
117 """Manage the set of inodes. This is the mapping from a numeric id
118 to a concrete File or Directory object"""
120 def __init__(self, inode_cache=256*1024*1024):
122 self._counter = itertools.count(llfuse.ROOT_INODE)
123 self._obj_cache = InodeCache(cap=inode_cache)
125 def __getitem__(self, item):
126 return self._entries[item]
128 def __setitem__(self, key, item):
129 self._entries[key] = item
132 return self._entries.iterkeys()
135 return self._entries.items()
137 def __contains__(self, k):
138 return k in self._entries
140 def touch(self, entry):
141 entry._atime = time.time()
142 self._obj_cache.touch(entry)
145 self._obj_cache.cap_cache()
147 def add_entry(self, entry):
148 entry.inode = next(self._counter)
149 self._entries[entry.inode] = entry
150 self._obj_cache.manage(entry)
153 def del_entry(self, entry):
154 if entry.ref_count == 0:
155 _logger.warn("Deleting inode %i", entry.inode)
156 self._obj_cache.unmanage(entry)
157 llfuse.invalidate_inode(entry.inode)
158 del self._entries[entry.inode]
160 _logger.warn("Inode %i has refcount %i", entry.inode, entry.ref_count)
163 def catch_exceptions(orig_func):
164 @functools.wraps(orig_func)
165 def catch_exceptions_wrapper(self, *args, **kwargs):
167 return orig_func(self, *args, **kwargs)
168 except llfuse.FUSEError:
170 except EnvironmentError as e:
171 raise llfuse.FUSEError(e.errno)
173 _logger.exception("Unhandled exception during FUSE operation")
174 raise llfuse.FUSEError(errno.EIO)
176 return catch_exceptions_wrapper
179 class Operations(llfuse.Operations):
180 """This is the main interface with llfuse.
182 The methods on this object are called by llfuse threads to service FUSE
183 events to query and read from the file system.
185 llfuse has its own global lock which is acquired before calling a request handler,
186 so request handlers do not run concurrently unless the lock is explicitly released
187 using 'with llfuse.lock_released:'
191 def __init__(self, uid, gid, encoding="utf-8", inode_cache=1000, num_retries=7):
192 super(Operations, self).__init__()
194 self.inodes = Inodes(inode_cache)
197 self.encoding = encoding
199 # dict of inode to filehandle
200 self._filehandles = {}
201 self._filehandles_counter = 1
203 # Other threads that need to wait until the fuse driver
204 # is fully initialized should wait() on this event object.
205 self.initlock = threading.Event()
207 self.num_retries = num_retries
210 # Allow threads that are waiting for the driver to be finished
211 # initializing to continue
214 def access(self, inode, mode, ctx):
218 def getattr(self, inode):
219 if inode not in self.inodes:
220 raise llfuse.FUSEError(errno.ENOENT)
222 e = self.inodes[inode]
224 entry = llfuse.EntryAttributes()
227 entry.entry_timeout = 300
228 entry.attr_timeout = 300
230 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
231 if isinstance(e, Directory):
232 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
234 entry.st_mode |= stat.S_IFREG
235 if isinstance(e, FuseArvadosFile):
236 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
239 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
242 entry.st_uid = self.uid
243 entry.st_gid = self.gid
246 entry.st_size = e.size()
248 entry.st_blksize = 512
249 entry.st_blocks = (e.size()/512)+1
250 entry.st_atime = int(e.atime())
251 entry.st_mtime = int(e.mtime())
252 entry.st_ctime = int(e.mtime())
257 def lookup(self, parent_inode, name):
258 name = unicode(name, self.encoding)
264 if parent_inode in self.inodes:
265 p = self.inodes[parent_inode]
267 inode = p.parent_inode
268 elif isinstance(p, Directory) and name in p:
269 inode = p[name].inode
272 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
273 parent_inode, name, inode)
274 self.inodes[inode].inc_ref()
275 return self.getattr(inode)
277 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
279 raise llfuse.FUSEError(errno.ENOENT)
282 def forget(self, inodes):
283 for inode, nlookup in inodes:
284 _logger.debug("arv-mount forget: %i %i", inode, nlookup)
285 ent = self.inodes[inode]
286 if ent.dec_ref(nlookup) == 0 and ent.dead:
287 self.inodes.del_entry(ent)
290 def open(self, inode, flags):
291 if inode in self.inodes:
292 p = self.inodes[inode]
294 raise llfuse.FUSEError(errno.ENOENT)
296 if isinstance(p, Directory):
297 raise llfuse.FUSEError(errno.EISDIR)
299 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
300 raise llfuse.FUSEError(errno.EPERM)
302 fh = self._filehandles_counter
303 self._filehandles_counter += 1
304 self._filehandles[fh] = FileHandle(fh, p)
309 def read(self, fh, off, size):
310 _logger.debug("arv-mount read %i %i %i", fh, off, size)
311 if fh in self._filehandles:
312 handle = self._filehandles[fh]
314 raise llfuse.FUSEError(errno.EBADF)
316 self.inodes.touch(handle.obj)
319 with llfuse.lock_released:
320 return handle.obj.readfrom(off, size, self.num_retries)
321 except arvados.errors.NotFoundError as e:
322 _logger.warning("Block not found: " + str(e))
323 raise llfuse.FUSEError(errno.EIO)
326 def write(self, fh, off, buf):
327 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
328 if fh in self._filehandles:
329 handle = self._filehandles[fh]
331 raise llfuse.FUSEError(errno.EBADF)
333 if not handle.obj.writable():
334 raise llfuse.FUSEError(errno.EPERM)
336 self.inodes.touch(handle.obj)
338 with llfuse.lock_released:
339 return handle.obj.writeto(off, buf, self.num_retries)
342 def release(self, fh):
343 if fh in self._filehandles:
345 self._filehandles[fh].flush()
346 except EnvironmentError as e:
347 raise llfuse.FUSEError(e.errno)
349 _logger.exception("Flush error")
350 self._filehandles[fh].release()
351 del self._filehandles[fh]
352 self.inodes.cap_cache()
354 def releasedir(self, fh):
358 def opendir(self, inode):
359 _logger.debug("arv-mount opendir: inode %i", inode)
361 if inode in self.inodes:
362 p = self.inodes[inode]
364 raise llfuse.FUSEError(errno.ENOENT)
366 if not isinstance(p, Directory):
367 raise llfuse.FUSEError(errno.ENOTDIR)
369 fh = self._filehandles_counter
370 self._filehandles_counter += 1
371 if p.parent_inode in self.inodes:
372 parent = self.inodes[p.parent_inode]
374 raise llfuse.FUSEError(errno.EIO)
379 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
383 def readdir(self, fh, off):
384 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
386 if fh in self._filehandles:
387 handle = self._filehandles[fh]
389 raise llfuse.FUSEError(errno.EBADF)
391 _logger.debug("arv-mount handle.dirobj %s", handle.obj)
394 while e < len(handle.entries):
395 if handle.entries[e][1].inode in self.inodes:
397 yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
398 except UnicodeEncodeError:
404 st = llfuse.StatvfsData()
405 st.f_bsize = 64 * 1024
418 def _check_writable(self, inode_parent):
419 if inode_parent in self.inodes:
420 p = self.inodes[inode_parent]
422 raise llfuse.FUSEError(errno.ENOENT)
424 if not isinstance(p, Directory):
425 raise llfuse.FUSEError(errno.ENOTDIR)
428 raise llfuse.FUSEError(errno.EPERM)
430 if not isinstance(p, CollectionDirectoryBase):
431 raise llfuse.FUSEError(errno.EPERM)
436 def create(self, inode_parent, name, mode, flags, ctx):
437 p = self._check_writable(inode_parent)
439 with llfuse.lock_released:
440 p.collection.open(name, "w")
442 # The file entry should have been implicitly created by callback.
444 fh = self._filehandles_counter
445 self._filehandles_counter += 1
446 self._filehandles[fh] = FileHandle(fh, f)
450 return (fh, self.getattr(f.inode))
453 def mkdir(self, inode_parent, name, mode, ctx):
454 p = self._check_writable(inode_parent)
456 with llfuse.lock_released:
457 p.collection.mkdirs(name)
459 # The dir entry should have been implicitly created by callback.
463 return self.getattr(d.inode)
466 def unlink(self, inode_parent, name):
467 p = self._check_writable(inode_parent)
469 with llfuse.lock_released:
470 p.collection.remove(name)
472 def rmdir(self, inode_parent, name):
473 self.unlink(inode_parent, name)
476 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
477 src = self._check_writable(inode_parent_old)
478 dest = self._check_writable(inode_parent_new)
480 with llfuse.lock_released:
481 dest.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
487 if fh in self._filehandles:
488 self._filehandles[fh].flush()
490 def fsync(self, fh, datasync):
493 def fsyncdir(self, fh, datasync):