2 # FUSE driver for Arvados Keep
26 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
27 from fusefile import StringFile, FuseArvadosFile
29 _logger = logging.getLogger('arvados.arvados_fuse')
31 # log_handler = logging.StreamHandler()
32 # llogger = logging.getLogger('llfuse')
33 # llogger.addHandler(log_handler)
34 # llogger.setLevel(logging.DEBUG)
36 class FileHandle(object):
37 """Connects a numeric file handle to a File object that has
38 been opened by the client."""
40 def __init__(self, fh, fileobj):
42 self.fileobj = fileobj
43 self.fileobj.inc_use()
46 self.fileobj.dec_use()
49 class DirectoryHandle(object):
50 """Connects a numeric file handle to a Directory object that has
51 been opened by the client."""
53 def __init__(self, fh, dirobj, entries):
55 self.entries = entries
63 class InodeCache(object):
64 def __init__(self, cap):
65 self._entries = collections.OrderedDict()
66 self._counter = itertools.count(1)
70 def _remove(self, obj, clear):
71 if clear and not obj.clear():
72 _logger.debug("Could not clear %s in_use %s", obj, obj.in_use())
74 self._total -= obj._cache_size
75 del self._entries[obj._cache_priority]
76 _logger.debug("Cleared %s total now %i", obj, self._total)
80 _logger.debug("total is %i cap is %i", self._total, self.cap)
81 if self._total > self.cap:
83 for key in list(self._entries.keys()):
84 if self._total < self.cap or len(self._entries) < 4:
86 self._remove(self._entries[key], True)
89 def manage(self, obj):
91 obj._cache_priority = next(self._counter)
92 obj._cache_size = obj.objsize()
93 self._entries[obj._cache_priority] = obj
94 self._total += obj.objsize()
95 _logger.debug("Managing %s total now %i", obj, self._total)
100 if obj._cache_priority in self._entries:
101 self._remove(obj, False)
103 _logger.debug("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
105 def unmanage(self, obj):
106 if obj.persisted() and obj._cache_priority in self._entries:
107 self._remove(obj, True)
109 class Inodes(object):
110 """Manage the set of inodes. This is the mapping from a numeric id
111 to a concrete File or Directory object"""
113 def __init__(self, inode_cache=256*1024*1024):
115 self._counter = itertools.count(llfuse.ROOT_INODE)
116 self._obj_cache = InodeCache(cap=inode_cache)
118 def __getitem__(self, item):
119 return self._entries[item]
121 def __setitem__(self, key, item):
122 self._entries[key] = item
125 return self._entries.iterkeys()
128 return self._entries.items()
130 def __contains__(self, k):
131 return k in self._entries
133 def touch(self, entry):
134 entry._atime = time.time()
135 self._obj_cache.touch(entry)
138 self._obj_cache.cap_cache()
140 def add_entry(self, entry):
141 entry.inode = next(self._counter)
142 self._entries[entry.inode] = entry
143 self._obj_cache.manage(entry)
146 def del_entry(self, entry):
147 self._obj_cache.unmanage(entry)
148 llfuse.invalidate_inode(entry.inode)
149 del self._entries[entry.inode]
152 class Operations(llfuse.Operations):
153 """This is the main interface with llfuse.
155 The methods on this object are called by llfuse threads to service FUSE
156 events to query and read from the file system.
158 llfuse has its own global lock which is acquired before calling a request handler,
159 so request handlers do not run concurrently unless the lock is explicitly released
160 using 'with llfuse.lock_released:'
164 def __init__(self, uid, gid, encoding="utf-8", inode_cache=1000, num_retries=7):
165 super(Operations, self).__init__()
167 self.inodes = Inodes(inode_cache)
170 self.encoding = encoding
172 # dict of inode to filehandle
173 self._filehandles = {}
174 self._filehandles_counter = 1
176 # Other threads that need to wait until the fuse driver
177 # is fully initialized should wait() on this event object.
178 self.initlock = threading.Event()
180 self.num_retries = num_retries
183 # Allow threads that are waiting for the driver to be finished
184 # initializing to continue
187 def access(self, inode, mode, ctx):
190 def getattr(self, inode):
191 if inode not in self.inodes:
192 raise llfuse.FUSEError(errno.ENOENT)
194 e = self.inodes[inode]
196 entry = llfuse.EntryAttributes()
199 entry.entry_timeout = 300
200 entry.attr_timeout = 300
202 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
203 if isinstance(e, Directory):
204 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
206 entry.st_mode |= stat.S_IFREG
207 if isinstance(e, FuseArvadosFile):
208 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
211 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
214 entry.st_uid = self.uid
215 entry.st_gid = self.gid
218 entry.st_size = e.size()
220 entry.st_blksize = 512
221 entry.st_blocks = (e.size()/512)+1
222 entry.st_atime = int(e.atime())
223 entry.st_mtime = int(e.mtime())
224 entry.st_ctime = int(e.mtime())
228 def lookup(self, parent_inode, name):
229 name = unicode(name, self.encoding)
230 _logger.debug("arv-mount lookup: parent_inode %i name %s",
237 if parent_inode in self.inodes:
238 p = self.inodes[parent_inode]
240 inode = p.parent_inode
241 elif isinstance(p, Directory) and name in p:
242 inode = p[name].inode
245 return self.getattr(inode)
247 raise llfuse.FUSEError(errno.ENOENT)
249 def open(self, inode, flags):
250 if inode in self.inodes:
251 p = self.inodes[inode]
253 raise llfuse.FUSEError(errno.ENOENT)
255 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
256 raise llfuse.FUSEError(errno.EROFS)
258 if isinstance(p, Directory):
259 raise llfuse.FUSEError(errno.EISDIR)
261 fh = self._filehandles_counter
262 self._filehandles_counter += 1
263 self._filehandles[fh] = FileHandle(fh, p)
267 def read(self, fh, off, size):
268 _logger.debug("arv-mount read %i %i %i", fh, off, size)
269 if fh in self._filehandles:
270 handle = self._filehandles[fh]
272 raise llfuse.FUSEError(errno.EBADF)
274 self.inodes.touch(handle.fileobj)
277 with llfuse.lock_released:
278 return handle.fileobj.readfrom(off, size, self.num_retries)
279 except arvados.errors.NotFoundError as e:
280 _logger.warning("Block not found: " + str(e))
281 raise llfuse.FUSEError(errno.EIO)
283 _logger.exception("Read error")
284 raise llfuse.FUSEError(errno.EIO)
286 def release(self, fh):
287 if fh in self._filehandles:
288 self._filehandles[fh].release()
289 del self._filehandles[fh]
290 self.inodes.cap_cache()
292 def releasedir(self, fh):
295 def opendir(self, inode):
296 _logger.debug("arv-mount opendir: inode %i", inode)
298 if inode in self.inodes:
299 p = self.inodes[inode]
301 raise llfuse.FUSEError(errno.ENOENT)
303 if not isinstance(p, Directory):
304 raise llfuse.FUSEError(errno.ENOTDIR)
306 fh = self._filehandles_counter
307 self._filehandles_counter += 1
308 if p.parent_inode in self.inodes:
309 parent = self.inodes[p.parent_inode]
311 raise llfuse.FUSEError(errno.EIO)
316 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
320 def readdir(self, fh, off):
321 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
323 if fh in self._filehandles:
324 handle = self._filehandles[fh]
326 raise llfuse.FUSEError(errno.EBADF)
328 _logger.debug("arv-mount handle.dirobj %s", handle.dirobj)
331 while e < len(handle.entries):
332 if handle.entries[e][1].inode in self.inodes:
334 yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
335 except UnicodeEncodeError:
340 st = llfuse.StatvfsData()
341 st.f_bsize = 64 * 1024
354 # The llfuse documentation recommends only overloading functions that
355 # are actually implemented, as the default implementation will raise ENOSYS.
356 # However, there is a bug in the llfuse default implementation of create()
357 # "create() takes exactly 5 positional arguments (6 given)" which will crash
359 # The workaround is to implement it with the proper number of parameters,
360 # and then everything works out.
361 def create(self, inode_parent, name, mode, flags, ctx):
362 raise llfuse.FUSEError(errno.EROFS)