2 # FUSE driver for Arvados Keep
26 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory
27 from fusefile import StringFile, FuseArvadosFile
29 _logger = logging.getLogger('arvados.arvados_fuse')
32 class FileHandle(object):
33 """Connects a numeric file handle to a File object that has
34 been opened by the client."""
36 def __init__(self, fh, fileobj):
38 self.fileobj = fileobj
39 self.fileobj.inc_use()
42 self.fileobj.dec_use()
45 class DirectoryHandle(object):
46 """Connects a numeric file handle to a Directory object that has
47 been opened by the client."""
49 def __init__(self, fh, dirobj, entries):
51 self.entries = entries
59 class InodeCache(object):
60 def __init__(self, cap):
61 self._entries = collections.OrderedDict()
62 self._counter = itertools.count(1)
66 def _remove(self, obj, clear):
67 if clear and not obj.clear():
68 _logger.debug("Could not clear %s in_use %s", obj, obj.in_use())
70 self._total -= obj._cache_size
71 del self._entries[obj._cache_priority]
72 _logger.debug("Cleared %s total now %i", obj, self._total)
76 _logger.debug("total is %i cap is %i", self._total, self.cap)
77 if self._total > self.cap:
79 for key in list(self._entries.keys()):
80 if self._total < self.cap or len(self._entries) < 4:
82 self._remove(self._entries[key], True)
85 def manage(self, obj):
87 obj._cache_priority = next(self._counter)
88 obj._cache_size = obj.objsize()
89 self._entries[obj._cache_priority] = obj
90 self._total += obj.objsize()
91 _logger.debug("Managing %s total now %i", obj, self._total)
96 if obj._cache_priority in self._entries:
97 self._remove(obj, False)
99 _logger.debug("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
101 def unmanage(self, obj):
102 if obj.persisted() and obj._cache_priority in self._entries:
103 self._remove(obj, True)
105 class Inodes(object):
106 """Manage the set of inodes. This is the mapping from a numeric id
107 to a concrete File or Directory object"""
109 def __init__(self, inode_cache=256*1024*1024):
111 self._counter = itertools.count(llfuse.ROOT_INODE)
112 self._obj_cache = InodeCache(cap=inode_cache)
114 def __getitem__(self, item):
115 return self._entries[item]
117 def __setitem__(self, key, item):
118 self._entries[key] = item
121 return self._entries.iterkeys()
124 return self._entries.items()
126 def __contains__(self, k):
127 return k in self._entries
129 def touch(self, entry):
130 entry._atime = time.time()
131 self._obj_cache.touch(entry)
134 self._obj_cache.cap_cache()
136 def add_entry(self, entry):
137 entry.inode = next(self._counter)
138 self._entries[entry.inode] = entry
139 self._obj_cache.manage(entry)
142 def del_entry(self, entry):
143 self._obj_cache.unmanage(entry)
144 llfuse.invalidate_inode(entry.inode)
145 del self._entries[entry.inode]
148 class Operations(llfuse.Operations):
149 """This is the main interface with llfuse.
151 The methods on this object are called by llfuse threads to service FUSE
152 events to query and read from the file system.
154 llfuse has its own global lock which is acquired before calling a request handler,
155 so request handlers do not run concurrently unless the lock is explicitly released
156 using 'with llfuse.lock_released:'
160 def __init__(self, uid, gid, encoding="utf-8", inode_cache=1000, num_retries=7):
161 super(Operations, self).__init__()
163 self.inodes = Inodes(inode_cache)
166 self.encoding = encoding
168 # dict of inode to filehandle
169 self._filehandles = {}
170 self._filehandles_counter = 1
172 # Other threads that need to wait until the fuse driver
173 # is fully initialized should wait() on this event object.
174 self.initlock = threading.Event()
176 self.num_retries = num_retries
179 # Allow threads that are waiting for the driver to be finished
180 # initializing to continue
183 def access(self, inode, mode, ctx):
186 def getattr(self, inode):
187 if inode not in self.inodes:
188 raise llfuse.FUSEError(errno.ENOENT)
190 e = self.inodes[inode]
192 entry = llfuse.EntryAttributes()
195 entry.entry_timeout = 300
196 entry.attr_timeout = 300
198 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
199 if isinstance(e, Directory):
200 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
201 elif isinstance(e, FuseArvadosFile):
202 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
204 entry.st_mode |= stat.S_IFREG
207 entry.st_uid = self.uid
208 entry.st_gid = self.gid
211 entry.st_size = e.size()
213 entry.st_blksize = 512
214 entry.st_blocks = (e.size()/512)+1
215 entry.st_atime = int(e.atime())
216 entry.st_mtime = int(e.mtime())
217 entry.st_ctime = int(e.mtime())
221 def lookup(self, parent_inode, name):
222 name = unicode(name, self.encoding)
223 _logger.debug("arv-mount lookup: parent_inode %i name %s",
230 if parent_inode in self.inodes:
231 p = self.inodes[parent_inode]
233 inode = p.parent_inode
234 elif isinstance(p, Directory) and name in p:
235 inode = p[name].inode
238 return self.getattr(inode)
240 raise llfuse.FUSEError(errno.ENOENT)
242 def open(self, inode, flags):
243 if inode in self.inodes:
244 p = self.inodes[inode]
246 raise llfuse.FUSEError(errno.ENOENT)
248 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
249 raise llfuse.FUSEError(errno.EROFS)
251 if isinstance(p, Directory):
252 raise llfuse.FUSEError(errno.EISDIR)
254 fh = self._filehandles_counter
255 self._filehandles_counter += 1
256 self._filehandles[fh] = FileHandle(fh, p)
260 def read(self, fh, off, size):
261 _logger.debug("arv-mount read %i %i %i", fh, off, size)
262 if fh in self._filehandles:
263 handle = self._filehandles[fh]
265 raise llfuse.FUSEError(errno.EBADF)
267 self.inodes.touch(handle.fileobj)
270 with llfuse.lock_released:
271 return handle.fileobj.readfrom(off, size, self.num_retries)
272 except arvados.errors.NotFoundError as e:
273 _logger.warning("Block not found: " + str(e))
274 raise llfuse.FUSEError(errno.EIO)
276 _logger.exception("Read error")
277 raise llfuse.FUSEError(errno.EIO)
279 def release(self, fh):
280 if fh in self._filehandles:
281 self._filehandles[fh].release()
282 del self._filehandles[fh]
283 self.inodes.cap_cache()
285 def releasedir(self, fh):
288 def opendir(self, inode):
289 _logger.debug("arv-mount opendir: inode %i", inode)
291 if inode in self.inodes:
292 p = self.inodes[inode]
294 raise llfuse.FUSEError(errno.ENOENT)
296 if not isinstance(p, Directory):
297 raise llfuse.FUSEError(errno.ENOTDIR)
299 fh = self._filehandles_counter
300 self._filehandles_counter += 1
301 if p.parent_inode in self.inodes:
302 parent = self.inodes[p.parent_inode]
304 raise llfuse.FUSEError(errno.EIO)
309 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
313 def readdir(self, fh, off):
314 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
316 if fh in self._filehandles:
317 handle = self._filehandles[fh]
319 raise llfuse.FUSEError(errno.EBADF)
321 _logger.debug("arv-mount handle.dirobj %s", handle.dirobj)
324 while e < len(handle.entries):
325 if handle.entries[e][1].inode in self.inodes:
327 yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
328 except UnicodeEncodeError:
333 st = llfuse.StatvfsData()
334 st.f_bsize = 64 * 1024
347 # The llfuse documentation recommends only overloading functions that
348 # are actually implemented, as the default implementation will raise ENOSYS.
349 # However, there is a bug in the llfuse default implementation of create()
350 # "create() takes exactly 5 positional arguments (6 given)" which will crash
352 # The workaround is to implement it with the proper number of parameters,
353 # and then everything works out.
354 def create(self, inode_parent, name, mode, flags, ctx):
355 raise llfuse.FUSEError(errno.EROFS)