2 # FUSE driver for Arvados Keep
27 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
28 from fusefile import StringFile, FuseArvadosFile
30 _logger = logging.getLogger('arvados.arvados_fuse')
32 log_handler = logging.StreamHandler()
33 llogger = logging.getLogger('llfuse')
34 llogger.addHandler(log_handler)
35 llogger.setLevel(logging.DEBUG)
38 """Connects a numeric file handle to a File or Directory object that has
39 been opened by the client."""
41 def __init__(self, fh, obj):
50 with llfuse.lock_released:
51 return self.obj.flush()
54 class FileHandle(Handle):
55 """Connects a numeric file handle to a File object that has
56 been opened by the client."""
60 class DirectoryHandle(Handle):
61 """Connects a numeric file handle to a Directory object that has
62 been opened by the client."""
64 def __init__(self, fh, dirobj, entries):
65 super(DirectoryHandle, self).__init__(fh, dirobj)
66 self.entries = entries
69 class InodeCache(object):
70 def __init__(self, cap):
71 self._entries = collections.OrderedDict()
73 self._counter = itertools.count(1)
77 def _remove(self, obj, clear):
78 if clear and not obj.clear():
79 _logger.debug("Could not clear %s in_use %s", obj, obj.in_use())
81 self._total -= obj._cache_size
82 del self._entries[obj._cache_priority]
84 del self._by_uuid[obj._cache_uuid]
85 obj._cache_uuid = None
86 _logger.debug("Cleared %s total now %i", obj, self._total)
90 _logger.debug("total is %i cap is %i", self._total, self.cap)
91 if self._total > self.cap:
92 for key in list(self._entries.keys()):
93 if self._total < self.cap or len(self._entries) < 4:
95 self._remove(self._entries[key], True)
97 def manage(self, obj):
99 obj._cache_priority = next(self._counter)
100 obj._cache_size = obj.objsize()
101 self._entries[obj._cache_priority] = obj
103 obj._cache_uuid = obj.uuid()
104 self._by_uuid[obj._cache_uuid] = obj
105 self._total += obj.objsize()
106 _logger.debug("Managing %s total now %i", obj, self._total)
109 obj._cache_priority = None
111 def touch(self, obj):
113 if obj._cache_priority in self._entries:
114 self._remove(obj, False)
116 _logger.debug("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
118 def unmanage(self, obj):
119 if obj.persisted() and obj._cache_priority in self._entries:
120 self._remove(obj, True)
122 def find(self, uuid):
123 return self._by_uuid.get(uuid)
125 class Inodes(object):
126 """Manage the set of inodes. This is the mapping from a numeric id
127 to a concrete File or Directory object"""
129 def __init__(self, inode_cache=256*1024*1024):
131 self._counter = itertools.count(llfuse.ROOT_INODE)
132 self.cache = InodeCache(cap=inode_cache)
134 def __getitem__(self, item):
135 return self._entries[item]
137 def __setitem__(self, key, item):
138 self._entries[key] = item
141 return self._entries.iterkeys()
144 return self._entries.items()
146 def __contains__(self, k):
147 return k in self._entries
149 def touch(self, entry):
150 entry._atime = time.time()
151 self.cache.touch(entry)
153 def add_entry(self, entry):
154 entry.inode = next(self._counter)
155 self._entries[entry.inode] = entry
156 self.cache.manage(entry)
159 def del_entry(self, entry):
160 if entry.ref_count == 0:
161 _logger.warn("Deleting inode %i", entry.inode)
162 self.cache.unmanage(entry)
163 llfuse.invalidate_inode(entry.inode)
164 del self._entries[entry.inode]
166 _logger.warn("Inode %i has refcount %i", entry.inode, entry.ref_count)
169 def catch_exceptions(orig_func):
170 @functools.wraps(orig_func)
171 def catch_exceptions_wrapper(self, *args, **kwargs):
173 return orig_func(self, *args, **kwargs)
174 except llfuse.FUSEError:
176 except EnvironmentError as e:
177 raise llfuse.FUSEError(e.errno)
179 _logger.exception("Unhandled exception during FUSE operation")
180 raise llfuse.FUSEError(errno.EIO)
182 return catch_exceptions_wrapper
185 class Operations(llfuse.Operations):
186 """This is the main interface with llfuse.
188 The methods on this object are called by llfuse threads to service FUSE
189 events to query and read from the file system.
191 llfuse has its own global lock which is acquired before calling a request handler,
192 so request handlers do not run concurrently unless the lock is explicitly released
193 using 'with llfuse.lock_released:'
197 def __init__(self, uid, gid, encoding="utf-8", inode_cache=1000, num_retries=7):
198 super(Operations, self).__init__()
200 self.inodes = Inodes(inode_cache)
203 self.encoding = encoding
205 # dict of inode to filehandle
206 self._filehandles = {}
207 self._filehandles_counter = 1
209 # Other threads that need to wait until the fuse driver
210 # is fully initialized should wait() on this event object.
211 self.initlock = threading.Event()
213 self.num_retries = num_retries
218 # Allow threads that are waiting for the driver to be finished
219 # initializing to continue
226 def access(self, inode, mode, ctx):
229 def listen_for_events(self, api_client):
230 self.event = arvados.events.subscribe(api_client,
231 [["event_type", "in", ["create", "update", "delete"]]],
234 def on_event(self, ev):
235 if 'event_type' in ev:
237 item = self.inodes.cache.find(ev["object_uuid"])
242 itemparent = self.inodes.cache.find(ev["object_owner_uuid"])
244 itemparent.invalidate()
248 def getattr(self, inode):
249 if inode not in self.inodes:
250 raise llfuse.FUSEError(errno.ENOENT)
252 e = self.inodes[inode]
254 entry = llfuse.EntryAttributes()
257 entry.entry_timeout = 300
258 entry.attr_timeout = 300
260 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
261 if isinstance(e, Directory):
262 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
264 entry.st_mode |= stat.S_IFREG
265 if isinstance(e, FuseArvadosFile):
266 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
269 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
272 entry.st_uid = self.uid
273 entry.st_gid = self.gid
276 entry.st_size = e.size()
278 entry.st_blksize = 512
279 entry.st_blocks = (e.size()/512)+1
280 entry.st_atime = int(e.atime())
281 entry.st_mtime = int(e.mtime())
282 entry.st_ctime = int(e.mtime())
287 def lookup(self, parent_inode, name):
288 name = unicode(name, self.encoding)
294 if parent_inode in self.inodes:
295 p = self.inodes[parent_inode]
297 inode = p.parent_inode
298 elif isinstance(p, Directory) and name in p:
299 inode = p[name].inode
302 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
303 parent_inode, name, inode)
304 self.inodes[inode].inc_ref()
305 return self.getattr(inode)
307 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
309 raise llfuse.FUSEError(errno.ENOENT)
312 def forget(self, inodes):
313 for inode, nlookup in inodes:
314 _logger.debug("arv-mount forget: %i %i", inode, nlookup)
315 ent = self.inodes[inode]
316 if ent.dec_ref(nlookup) == 0 and ent.dead:
317 self.inodes.del_entry(ent)
320 def open(self, inode, flags):
321 if inode in self.inodes:
322 p = self.inodes[inode]
324 raise llfuse.FUSEError(errno.ENOENT)
326 if isinstance(p, Directory):
327 raise llfuse.FUSEError(errno.EISDIR)
329 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
330 raise llfuse.FUSEError(errno.EPERM)
332 fh = self._filehandles_counter
333 self._filehandles_counter += 1
334 self._filehandles[fh] = FileHandle(fh, p)
339 def read(self, fh, off, size):
340 _logger.debug("arv-mount read %i %i %i", fh, off, size)
341 if fh in self._filehandles:
342 handle = self._filehandles[fh]
344 raise llfuse.FUSEError(errno.EBADF)
346 self.inodes.touch(handle.obj)
349 with llfuse.lock_released:
350 return handle.obj.readfrom(off, size, self.num_retries)
351 except arvados.errors.NotFoundError as e:
352 _logger.warning("Block not found: " + str(e))
353 raise llfuse.FUSEError(errno.EIO)
356 def write(self, fh, off, buf):
357 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
358 if fh in self._filehandles:
359 handle = self._filehandles[fh]
361 raise llfuse.FUSEError(errno.EBADF)
363 if not handle.obj.writable():
364 raise llfuse.FUSEError(errno.EPERM)
366 self.inodes.touch(handle.obj)
368 with llfuse.lock_released:
369 return handle.obj.writeto(off, buf, self.num_retries)
372 def release(self, fh):
373 if fh in self._filehandles:
375 self._filehandles[fh].flush()
376 except EnvironmentError as e:
377 raise llfuse.FUSEError(e.errno)
379 _logger.exception("Flush error")
380 self._filehandles[fh].release()
381 del self._filehandles[fh]
382 self.inodes.cache.cap_cache()
384 def releasedir(self, fh):
388 def opendir(self, inode):
389 _logger.debug("arv-mount opendir: inode %i", inode)
391 if inode in self.inodes:
392 p = self.inodes[inode]
394 raise llfuse.FUSEError(errno.ENOENT)
396 if not isinstance(p, Directory):
397 raise llfuse.FUSEError(errno.ENOTDIR)
399 fh = self._filehandles_counter
400 self._filehandles_counter += 1
401 if p.parent_inode in self.inodes:
402 parent = self.inodes[p.parent_inode]
404 raise llfuse.FUSEError(errno.EIO)
409 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
413 def readdir(self, fh, off):
414 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
416 if fh in self._filehandles:
417 handle = self._filehandles[fh]
419 raise llfuse.FUSEError(errno.EBADF)
421 _logger.debug("arv-mount handle.dirobj %s", handle.obj)
424 while e < len(handle.entries):
425 if handle.entries[e][1].inode in self.inodes:
427 yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
428 except UnicodeEncodeError:
434 st = llfuse.StatvfsData()
435 st.f_bsize = 64 * 1024
448 def _check_writable(self, inode_parent):
449 if inode_parent in self.inodes:
450 p = self.inodes[inode_parent]
452 raise llfuse.FUSEError(errno.ENOENT)
454 if not isinstance(p, Directory):
455 raise llfuse.FUSEError(errno.ENOTDIR)
458 raise llfuse.FUSEError(errno.EPERM)
460 if not isinstance(p, CollectionDirectoryBase):
461 raise llfuse.FUSEError(errno.EPERM)
466 def create(self, inode_parent, name, mode, flags, ctx):
467 p = self._check_writable(inode_parent)
469 with llfuse.lock_released:
470 p.collection.open(name, "w")
472 # The file entry should have been implicitly created by callback.
474 fh = self._filehandles_counter
475 self._filehandles_counter += 1
476 self._filehandles[fh] = FileHandle(fh, f)
480 return (fh, self.getattr(f.inode))
483 def mkdir(self, inode_parent, name, mode, ctx):
484 p = self._check_writable(inode_parent)
486 with llfuse.lock_released:
487 p.collection.mkdirs(name)
489 # The dir entry should have been implicitly created by callback.
493 return self.getattr(d.inode)
496 def unlink(self, inode_parent, name):
497 p = self._check_writable(inode_parent)
499 with llfuse.lock_released:
500 p.collection.remove(name)
502 def rmdir(self, inode_parent, name):
503 self.unlink(inode_parent, name)
506 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
507 src = self._check_writable(inode_parent_old)
508 dest = self._check_writable(inode_parent_new)
510 with llfuse.lock_released:
511 dest.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
517 if fh in self._filehandles:
518 self._filehandles[fh].flush()
520 def fsync(self, fh, datasync):
523 def fsyncdir(self, fh, datasync):