2 # FUSE driver for Arvados Keep
27 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
28 from fusefile import StringFile, FuseArvadosFile
30 _logger = logging.getLogger('arvados.arvados_fuse')
32 log_handler = logging.StreamHandler()
33 llogger = logging.getLogger('llfuse')
34 llogger.addHandler(log_handler)
35 llogger.setLevel(logging.DEBUG)
38 """Connects a numeric file handle to a File or Directory object that has
39 been opened by the client."""
41 def __init__(self, fh, obj):
50 with llfuse.lock_released:
51 return self.obj.flush()
54 class FileHandle(Handle):
55 """Connects a numeric file handle to a File object that has
56 been opened by the client."""
60 class DirectoryHandle(Handle):
61 """Connects a numeric file handle to a Directory object that has
62 been opened by the client."""
64 def __init__(self, fh, dirobj, entries):
65 super(DirectoryHandle, self).__init__(fh, dirobj)
66 self.entries = entries
69 class InodeCache(object):
70 def __init__(self, cap, min_entries=4):
71 self._entries = collections.OrderedDict()
73 self._counter = itertools.count(1)
76 self.min_entries = min_entries
81 def _remove(self, obj, clear):
82 if clear and not obj.clear():
83 _logger.debug("Could not clear %s in_use %s", obj, obj.in_use())
85 self._total -= obj.cache_size
86 del self._entries[obj.cache_priority]
88 del self._by_uuid[obj.cache_uuid]
90 _logger.debug("Cleared %s total now %i", obj, self._total)
94 _logger.debug("total is %i cap is %i", self._total, self.cap)
95 if self._total > self.cap:
96 for key in list(self._entries.keys()):
97 if self._total < self.cap or len(self._entries) < self.min_entries:
99 self._remove(self._entries[key], True)
101 def manage(self, obj):
103 obj.cache_priority = next(self._counter)
104 obj.cache_size = obj.objsize()
105 self._entries[obj.cache_priority] = obj
106 obj.cache_uuid = obj.uuid()
108 self._by_uuid[obj.cache_uuid] = obj
109 self._total += obj.objsize()
110 _logger.debug("Managing %s total now %i", obj, self._total)
113 obj.cache_priority = None
115 def touch(self, obj):
117 if obj.cache_priority in self._entries:
118 self._remove(obj, False)
120 _logger.debug("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
122 def unmanage(self, obj):
123 if obj.persisted() and obj.cache_priority in self._entries:
124 self._remove(obj, True)
126 def find(self, uuid):
127 return self._by_uuid.get(uuid)
129 class Inodes(object):
130 """Manage the set of inodes. This is the mapping from a numeric id
131 to a concrete File or Directory object"""
133 def __init__(self, inode_cache):
135 self._counter = itertools.count(llfuse.ROOT_INODE)
136 self.inode_cache = inode_cache
138 def __getitem__(self, item):
139 return self._entries[item]
141 def __setitem__(self, key, item):
142 self._entries[key] = item
145 return self._entries.iterkeys()
148 return self._entries.items()
150 def __contains__(self, k):
151 return k in self._entries
153 def touch(self, entry):
154 entry._atime = time.time()
155 self.inode_cache.touch(entry)
157 def add_entry(self, entry):
158 entry.inode = next(self._counter)
159 self._entries[entry.inode] = entry
160 self.inode_cache.manage(entry)
163 def del_entry(self, entry):
164 if entry.ref_count == 0:
165 _logger.warn("Deleting inode %i", entry.inode)
166 self.inode_cache.unmanage(entry)
167 llfuse.invalidate_inode(entry.inode)
168 del self._entries[entry.inode]
170 _logger.warn("Inode %i has refcount %i", entry.inode, entry.ref_count)
173 def catch_exceptions(orig_func):
174 @functools.wraps(orig_func)
175 def catch_exceptions_wrapper(self, *args, **kwargs):
177 return orig_func(self, *args, **kwargs)
178 except llfuse.FUSEError:
180 except EnvironmentError as e:
181 raise llfuse.FUSEError(e.errno)
183 _logger.exception("Unhandled exception during FUSE operation")
184 raise llfuse.FUSEError(errno.EIO)
186 return catch_exceptions_wrapper
189 class Operations(llfuse.Operations):
190 """This is the main interface with llfuse.
192 The methods on this object are called by llfuse threads to service FUSE
193 events to query and read from the file system.
195 llfuse has its own global lock which is acquired before calling a request handler,
196 so request handlers do not run concurrently unless the lock is explicitly released
197 using 'with llfuse.lock_released:'
201 def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4):
202 super(Operations, self).__init__()
205 inode_cache = InodeCache(cap=256*1024*1024)
206 self.inodes = Inodes(inode_cache)
209 self.encoding = encoding
211 # dict of inode to filehandle
212 self._filehandles = {}
213 self._filehandles_counter = 1
215 # Other threads that need to wait until the fuse driver
216 # is fully initialized should wait() on this event object.
217 self.initlock = threading.Event()
219 self.num_retries = num_retries
224 # Allow threads that are waiting for the driver to be finished
225 # initializing to continue
232 def access(self, inode, mode, ctx):
235 def listen_for_events(self, api_client):
236 self.event = arvados.events.subscribe(api_client,
237 [["event_type", "in", ["create", "update", "delete"]]],
240 def on_event(self, ev):
241 if 'event_type' in ev:
243 item = self.inodes.inode_cache.find(ev["object_uuid"])
248 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
250 itemparent.invalidate()
254 def getattr(self, inode):
255 if inode not in self.inodes:
256 raise llfuse.FUSEError(errno.ENOENT)
258 e = self.inodes[inode]
260 entry = llfuse.EntryAttributes()
263 entry.entry_timeout = 300
264 entry.attr_timeout = 300
266 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
267 if isinstance(e, Directory):
268 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
270 entry.st_mode |= stat.S_IFREG
271 if isinstance(e, FuseArvadosFile):
272 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
275 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
278 entry.st_uid = self.uid
279 entry.st_gid = self.gid
282 entry.st_size = e.size()
284 entry.st_blksize = 512
285 entry.st_blocks = (e.size()/512)+1
286 entry.st_atime = int(e.atime())
287 entry.st_mtime = int(e.mtime())
288 entry.st_ctime = int(e.mtime())
293 def lookup(self, parent_inode, name):
294 name = unicode(name, self.encoding)
300 if parent_inode in self.inodes:
301 p = self.inodes[parent_inode]
303 inode = p.parent_inode
304 elif isinstance(p, Directory) and name in p:
305 inode = p[name].inode
308 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
309 parent_inode, name, inode)
310 self.inodes[inode].inc_ref()
311 return self.getattr(inode)
313 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
315 raise llfuse.FUSEError(errno.ENOENT)
318 def forget(self, inodes):
319 for inode, nlookup in inodes:
320 _logger.debug("arv-mount forget: %i %i", inode, nlookup)
321 ent = self.inodes[inode]
322 if ent.dec_ref(nlookup) == 0 and ent.dead:
323 self.inodes.del_entry(ent)
326 def open(self, inode, flags):
327 if inode in self.inodes:
328 p = self.inodes[inode]
330 raise llfuse.FUSEError(errno.ENOENT)
332 if isinstance(p, Directory):
333 raise llfuse.FUSEError(errno.EISDIR)
335 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
336 raise llfuse.FUSEError(errno.EPERM)
338 fh = self._filehandles_counter
339 self._filehandles_counter += 1
340 self._filehandles[fh] = FileHandle(fh, p)
345 def read(self, fh, off, size):
346 _logger.debug("arv-mount read %i %i %i", fh, off, size)
347 if fh in self._filehandles:
348 handle = self._filehandles[fh]
350 raise llfuse.FUSEError(errno.EBADF)
352 self.inodes.touch(handle.obj)
355 with llfuse.lock_released:
356 return handle.obj.readfrom(off, size, self.num_retries)
357 except arvados.errors.NotFoundError as e:
358 _logger.warning("Block not found: " + str(e))
359 raise llfuse.FUSEError(errno.EIO)
362 def write(self, fh, off, buf):
363 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
364 if fh in self._filehandles:
365 handle = self._filehandles[fh]
367 raise llfuse.FUSEError(errno.EBADF)
369 if not handle.obj.writable():
370 raise llfuse.FUSEError(errno.EPERM)
372 self.inodes.touch(handle.obj)
374 with llfuse.lock_released:
375 return handle.obj.writeto(off, buf, self.num_retries)
378 def release(self, fh):
379 if fh in self._filehandles:
381 self._filehandles[fh].flush()
382 except EnvironmentError as e:
383 raise llfuse.FUSEError(e.errno)
385 _logger.exception("Flush error")
386 self._filehandles[fh].release()
387 del self._filehandles[fh]
388 self.inodes.inode_cache.cap_cache()
390 def releasedir(self, fh):
394 def opendir(self, inode):
395 _logger.debug("arv-mount opendir: inode %i", inode)
397 if inode in self.inodes:
398 p = self.inodes[inode]
400 raise llfuse.FUSEError(errno.ENOENT)
402 if not isinstance(p, Directory):
403 raise llfuse.FUSEError(errno.ENOTDIR)
405 fh = self._filehandles_counter
406 self._filehandles_counter += 1
407 if p.parent_inode in self.inodes:
408 parent = self.inodes[p.parent_inode]
410 raise llfuse.FUSEError(errno.EIO)
415 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
419 def readdir(self, fh, off):
420 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
422 if fh in self._filehandles:
423 handle = self._filehandles[fh]
425 raise llfuse.FUSEError(errno.EBADF)
427 _logger.debug("arv-mount handle.dirobj %s", handle.obj)
430 while e < len(handle.entries):
431 if handle.entries[e][1].inode in self.inodes:
433 yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
434 except UnicodeEncodeError:
440 st = llfuse.StatvfsData()
441 st.f_bsize = 64 * 1024
454 def _check_writable(self, inode_parent):
455 if inode_parent in self.inodes:
456 p = self.inodes[inode_parent]
458 raise llfuse.FUSEError(errno.ENOENT)
460 if not isinstance(p, Directory):
461 raise llfuse.FUSEError(errno.ENOTDIR)
464 raise llfuse.FUSEError(errno.EPERM)
466 if not isinstance(p, CollectionDirectoryBase):
467 raise llfuse.FUSEError(errno.EPERM)
472 def create(self, inode_parent, name, mode, flags, ctx):
473 p = self._check_writable(inode_parent)
475 with llfuse.lock_released:
476 p.collection.open(name, "w")
478 # The file entry should have been implicitly created by callback.
480 fh = self._filehandles_counter
481 self._filehandles_counter += 1
482 self._filehandles[fh] = FileHandle(fh, f)
486 return (fh, self.getattr(f.inode))
489 def mkdir(self, inode_parent, name, mode, ctx):
490 p = self._check_writable(inode_parent)
492 with llfuse.lock_released:
493 p.collection.mkdirs(name)
495 # The dir entry should have been implicitly created by callback.
499 return self.getattr(d.inode)
502 def unlink(self, inode_parent, name):
503 p = self._check_writable(inode_parent)
505 with llfuse.lock_released:
506 p.collection.remove(name)
508 def rmdir(self, inode_parent, name):
509 self.unlink(inode_parent, name)
512 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
513 src = self._check_writable(inode_parent_old)
514 dest = self._check_writable(inode_parent_new)
516 with llfuse.lock_released:
517 dest.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
523 if fh in self._filehandles:
524 self._filehandles[fh].flush()
526 def fsync(self, fh, datasync):
529 def fsyncdir(self, fh, datasync):