2 # FUSE driver for Arvados Keep
27 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
28 from fusefile import StringFile, FuseArvadosFile
30 _logger = logging.getLogger('arvados.arvados_fuse')
32 log_handler = logging.StreamHandler()
33 llogger = logging.getLogger('llfuse')
34 llogger.addHandler(log_handler)
35 llogger.setLevel(logging.DEBUG)
38 """Connects a numeric file handle to a File or Directory object that has
39 been opened by the client."""
41 def __init__(self, fh, obj):
50 with llfuse.lock_released:
51 return self.obj.flush()
54 class FileHandle(Handle):
55 """Connects a numeric file handle to a File object that has
56 been opened by the client."""
60 class DirectoryHandle(Handle):
61 """Connects a numeric file handle to a Directory object that has
62 been opened by the client."""
64 def __init__(self, fh, dirobj, entries):
65 super(DirectoryHandle, self).__init__(fh, dirobj)
66 self.entries = entries
69 class InodeCache(object):
70 def __init__(self, cap, min_entries=4):
71 self._entries = collections.OrderedDict()
73 self._counter = itertools.count(0)
76 self.min_entries = min_entries
81 def _remove(self, obj, clear):
82 if clear and not obj.clear():
83 _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
85 self._total -= obj.cache_size
86 del self._entries[obj.cache_priority]
88 del self._by_uuid[obj.cache_uuid]
91 _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
95 #_logger.debug("InodeCache total is %i cap is %i", self._total, self.cap)
96 if self._total > self.cap:
97 for key in list(self._entries.keys()):
98 if self._total < self.cap or len(self._entries) < self.min_entries:
100 self._remove(self._entries[key], True)
102 def manage(self, obj):
104 obj.cache_priority = next(self._counter)
105 obj.cache_size = obj.objsize()
106 self._entries[obj.cache_priority] = obj
107 obj.cache_uuid = obj.uuid()
109 self._by_uuid[obj.cache_uuid] = obj
110 self._total += obj.objsize()
111 _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
114 obj.cache_priority = None
116 def touch(self, obj):
118 if obj.cache_priority in self._entries:
119 self._remove(obj, False)
122 def unmanage(self, obj):
123 if obj.persisted() and obj.cache_priority in self._entries:
124 self._remove(obj, True)
126 def find(self, uuid):
127 return self._by_uuid.get(uuid)
129 class Inodes(object):
130 """Manage the set of inodes. This is the mapping from a numeric id
131 to a concrete File or Directory object"""
133 def __init__(self, inode_cache):
135 self._counter = itertools.count(llfuse.ROOT_INODE)
136 self.inode_cache = inode_cache
138 def __getitem__(self, item):
139 return self._entries[item]
141 def __setitem__(self, key, item):
142 self._entries[key] = item
145 return self._entries.iterkeys()
148 return self._entries.items()
150 def __contains__(self, k):
151 return k in self._entries
153 def touch(self, entry):
154 entry._atime = time.time()
155 self.inode_cache.touch(entry)
157 def add_entry(self, entry):
158 entry.inode = next(self._counter)
159 if entry.inode == llfuse.ROOT_INODE:
161 self._entries[entry.inode] = entry
162 self.inode_cache.manage(entry)
165 def del_entry(self, entry):
166 if entry.ref_count == 0:
167 _logger.debug("Deleting inode %i", entry.inode)
168 self.inode_cache.unmanage(entry)
169 llfuse.invalidate_inode(entry.inode)
170 del self._entries[entry.inode]
174 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
176 def catch_exceptions(orig_func):
177 @functools.wraps(orig_func)
178 def catch_exceptions_wrapper(self, *args, **kwargs):
180 return orig_func(self, *args, **kwargs)
181 except llfuse.FUSEError:
183 except EnvironmentError as e:
184 raise llfuse.FUSEError(e.errno)
186 _logger.exception("Unhandled exception during FUSE operation")
187 raise llfuse.FUSEError(errno.EIO)
189 return catch_exceptions_wrapper
192 class Operations(llfuse.Operations):
193 """This is the main interface with llfuse.
195 The methods on this object are called by llfuse threads to service FUSE
196 events to query and read from the file system.
198 llfuse has its own global lock which is acquired before calling a request handler,
199 so request handlers do not run concurrently unless the lock is explicitly released
200 using 'with llfuse.lock_released:'
204 def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4):
205 super(Operations, self).__init__()
208 inode_cache = InodeCache(cap=256*1024*1024)
209 self.inodes = Inodes(inode_cache)
212 self.encoding = encoding
214 # dict of inode to filehandle
215 self._filehandles = {}
216 self._filehandles_counter = itertools.count(0)
218 # Other threads that need to wait until the fuse driver
219 # is fully initialized should wait() on this event object.
220 self.initlock = threading.Event()
222 self.num_retries = num_retries
227 # Allow threads that are waiting for the driver to be finished
228 # initializing to continue
235 def access(self, inode, mode, ctx):
238 def listen_for_events(self, api_client):
239 self.event = arvados.events.subscribe(api_client,
240 [["event_type", "in", ["create", "update", "delete"]]],
243 def on_event(self, ev):
244 if 'event_type' in ev:
246 item = self.inodes.inode_cache.find(ev["object_uuid"])
251 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
253 itemparent.invalidate()
257 def getattr(self, inode):
258 if inode not in self.inodes:
259 raise llfuse.FUSEError(errno.ENOENT)
261 e = self.inodes[inode]
263 entry = llfuse.EntryAttributes()
266 entry.entry_timeout = 300
267 entry.attr_timeout = 300
269 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
270 if isinstance(e, Directory):
271 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
273 entry.st_mode |= stat.S_IFREG
274 if isinstance(e, FuseArvadosFile):
275 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
278 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
281 entry.st_uid = self.uid
282 entry.st_gid = self.gid
285 entry.st_size = e.size()
287 entry.st_blksize = 512
288 entry.st_blocks = (e.size()/512)+1
289 entry.st_atime = int(e.atime())
290 entry.st_mtime = int(e.mtime())
291 entry.st_ctime = int(e.mtime())
296 def lookup(self, parent_inode, name):
297 name = unicode(name, self.encoding)
303 if parent_inode in self.inodes:
304 p = self.inodes[parent_inode]
306 inode = p.parent_inode
307 elif isinstance(p, Directory) and name in p:
308 inode = p[name].inode
311 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
312 parent_inode, name, inode)
313 self.inodes[inode].inc_ref()
314 return self.getattr(inode)
316 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
318 raise llfuse.FUSEError(errno.ENOENT)
321 def forget(self, inodes):
322 for inode, nlookup in inodes:
323 ent = self.inodes[inode]
324 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
325 if ent.dec_ref(nlookup) == 0 and ent.dead:
326 self.inodes.del_entry(ent)
329 def open(self, inode, flags):
330 if inode in self.inodes:
331 p = self.inodes[inode]
333 raise llfuse.FUSEError(errno.ENOENT)
335 if isinstance(p, Directory):
336 raise llfuse.FUSEError(errno.EISDIR)
338 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
339 raise llfuse.FUSEError(errno.EPERM)
341 fh = next(self._filehandles_counter)
342 self._filehandles[fh] = FileHandle(fh, p)
347 def read(self, fh, off, size):
348 _logger.debug("arv-mount read %i %i %i", fh, off, size)
349 if fh in self._filehandles:
350 handle = self._filehandles[fh]
352 raise llfuse.FUSEError(errno.EBADF)
354 self.inodes.touch(handle.obj)
357 with llfuse.lock_released:
358 return handle.obj.readfrom(off, size, self.num_retries)
359 except arvados.errors.NotFoundError as e:
360 _logger.warning("Block not found: " + str(e))
361 raise llfuse.FUSEError(errno.EIO)
364 def write(self, fh, off, buf):
365 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
366 if fh in self._filehandles:
367 handle = self._filehandles[fh]
369 raise llfuse.FUSEError(errno.EBADF)
371 if not handle.obj.writable():
372 raise llfuse.FUSEError(errno.EPERM)
374 self.inodes.touch(handle.obj)
376 with llfuse.lock_released:
377 return handle.obj.writeto(off, buf, self.num_retries)
380 def release(self, fh):
381 if fh in self._filehandles:
383 self._filehandles[fh].flush()
384 except EnvironmentError as e:
385 raise llfuse.FUSEError(e.errno)
387 _logger.exception("Flush error")
388 self._filehandles[fh].release()
389 del self._filehandles[fh]
390 self.inodes.inode_cache.cap_cache()
392 def releasedir(self, fh):
396 def opendir(self, inode):
397 _logger.debug("arv-mount opendir: inode %i", inode)
399 if inode in self.inodes:
400 p = self.inodes[inode]
402 raise llfuse.FUSEError(errno.ENOENT)
404 if not isinstance(p, Directory):
405 raise llfuse.FUSEError(errno.ENOTDIR)
407 fh = next(self._filehandles_counter)
408 if p.parent_inode in self.inodes:
409 parent = self.inodes[p.parent_inode]
411 raise llfuse.FUSEError(errno.EIO)
416 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
420 def readdir(self, fh, off):
421 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
423 if fh in self._filehandles:
424 handle = self._filehandles[fh]
426 raise llfuse.FUSEError(errno.EBADF)
428 _logger.debug("arv-mount handle.dirobj %s", handle.obj)
431 while e < len(handle.entries):
432 if handle.entries[e][1].inode in self.inodes:
434 yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
435 except UnicodeEncodeError:
441 st = llfuse.StatvfsData()
442 st.f_bsize = 64 * 1024
455 def _check_writable(self, inode_parent):
456 if inode_parent in self.inodes:
457 p = self.inodes[inode_parent]
459 raise llfuse.FUSEError(errno.ENOENT)
461 if not isinstance(p, Directory):
462 raise llfuse.FUSEError(errno.ENOTDIR)
465 raise llfuse.FUSEError(errno.EPERM)
470 def create(self, inode_parent, name, mode, flags, ctx):
471 p = self._check_writable(inode_parent)
474 # The file entry should have been implicitly created by callback.
476 fh = next(self._filehandles_counter)
477 self._filehandles[fh] = FileHandle(fh, f)
481 return (fh, self.getattr(f.inode))
484 def mkdir(self, inode_parent, name, mode, ctx):
485 _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
487 p = self._check_writable(inode_parent)
490 # The dir entry should have been implicitly created by callback.
494 return self.getattr(d.inode)
497 def unlink(self, inode_parent, name):
498 _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
499 p = self._check_writable(inode_parent)
503 def rmdir(self, inode_parent, name):
504 _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
505 p = self._check_writable(inode_parent)
509 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
510 _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
511 src = self._check_writable(inode_parent_old)
512 dest = self._check_writable(inode_parent_new)
513 dest.rename(name_old, name_new, src)
517 if fh in self._filehandles:
518 self._filehandles[fh].flush()
520 def fsync(self, fh, datasync):
523 def fsyncdir(self, fh, datasync):