2 # FUSE driver for Arvados Keep
27 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
28 from fusefile import StringFile, FuseArvadosFile
30 _logger = logging.getLogger('arvados.arvados_fuse')
32 log_handler = logging.StreamHandler()
33 llogger = logging.getLogger('llfuse')
34 llogger.addHandler(log_handler)
35 llogger.setLevel(logging.DEBUG)
38 """Connects a numeric file handle to a File or Directory object that has
39 been opened by the client."""
41 def __init__(self, fh, obj):
50 return self.obj.flush()
53 class FileHandle(Handle):
54 """Connects a numeric file handle to a File object that has
55 been opened by the client."""
59 class DirectoryHandle(Handle):
60 """Connects a numeric file handle to a Directory object that has
61 been opened by the client."""
63 def __init__(self, fh, dirobj, entries):
64 super(DirectoryHandle, self).__init__(fh, dirobj)
65 self.entries = entries
68 class InodeCache(object):
69 def __init__(self, cap, min_entries=4):
70 self._entries = collections.OrderedDict()
72 self._counter = itertools.count(0)
75 self.min_entries = min_entries
80 def _remove(self, obj, clear):
81 if clear and not obj.clear():
82 _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
84 self._total -= obj.cache_size
85 del self._entries[obj.cache_priority]
87 del self._by_uuid[obj.cache_uuid]
90 _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
94 #_logger.debug("InodeCache total is %i cap is %i", self._total, self.cap)
95 if self._total > self.cap:
96 for key in list(self._entries.keys()):
97 if self._total < self.cap or len(self._entries) < self.min_entries:
99 self._remove(self._entries[key], True)
101 def manage(self, obj):
103 obj.cache_priority = next(self._counter)
104 obj.cache_size = obj.objsize()
105 self._entries[obj.cache_priority] = obj
106 obj.cache_uuid = obj.uuid()
108 self._by_uuid[obj.cache_uuid] = obj
109 self._total += obj.objsize()
110 _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
113 obj.cache_priority = None
115 def touch(self, obj):
117 if obj.cache_priority in self._entries:
118 self._remove(obj, False)
121 def unmanage(self, obj):
122 if obj.persisted() and obj.cache_priority in self._entries:
123 self._remove(obj, True)
125 def find(self, uuid):
126 return self._by_uuid.get(uuid)
128 class Inodes(object):
129 """Manage the set of inodes. This is the mapping from a numeric id
130 to a concrete File or Directory object"""
132 def __init__(self, inode_cache):
134 self._counter = itertools.count(llfuse.ROOT_INODE)
135 self.inode_cache = inode_cache
137 def __getitem__(self, item):
138 return self._entries[item]
140 def __setitem__(self, key, item):
141 self._entries[key] = item
144 return self._entries.iterkeys()
147 return self._entries.items()
149 def __contains__(self, k):
150 return k in self._entries
152 def touch(self, entry):
153 entry._atime = time.time()
154 self.inode_cache.touch(entry)
156 def add_entry(self, entry):
157 entry.inode = next(self._counter)
158 if entry.inode == llfuse.ROOT_INODE:
160 self._entries[entry.inode] = entry
161 self.inode_cache.manage(entry)
164 def del_entry(self, entry):
165 if entry.ref_count == 0:
166 _logger.debug("Deleting inode %i", entry.inode)
167 self.inode_cache.unmanage(entry)
168 llfuse.invalidate_inode(entry.inode)
169 del self._entries[entry.inode]
173 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
175 def catch_exceptions(orig_func):
176 @functools.wraps(orig_func)
177 def catch_exceptions_wrapper(self, *args, **kwargs):
179 return orig_func(self, *args, **kwargs)
180 except llfuse.FUSEError:
182 except EnvironmentError as e:
183 raise llfuse.FUSEError(e.errno)
185 _logger.exception("Unhandled exception during FUSE operation")
186 raise llfuse.FUSEError(errno.EIO)
188 return catch_exceptions_wrapper
191 class Operations(llfuse.Operations):
192 """This is the main interface with llfuse.
194 The methods on this object are called by llfuse threads to service FUSE
195 events to query and read from the file system.
197 llfuse has its own global lock which is acquired before calling a request handler,
198 so request handlers do not run concurrently unless the lock is explicitly released
199 using 'with llfuse.lock_released:'
203 def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4):
204 super(Operations, self).__init__()
207 inode_cache = InodeCache(cap=256*1024*1024)
208 self.inodes = Inodes(inode_cache)
211 self.encoding = encoding
213 # dict of inode to filehandle
214 self._filehandles = {}
215 self._filehandles_counter = itertools.count(0)
217 # Other threads that need to wait until the fuse driver
218 # is fully initialized should wait() on this event object.
219 self.initlock = threading.Event()
221 self.num_retries = num_retries
226 # Allow threads that are waiting for the driver to be finished
227 # initializing to continue
234 def access(self, inode, mode, ctx):
237 def listen_for_events(self, api_client):
238 self.event = arvados.events.subscribe(api_client,
239 [["event_type", "in", ["create", "update", "delete"]]],
242 def on_event(self, ev):
243 if 'event_type' in ev:
245 item = self.inodes.inode_cache.find(ev["object_uuid"])
250 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
251 if itemparent is not None:
252 itemparent.invalidate()
256 def getattr(self, inode):
257 if inode not in self.inodes:
258 raise llfuse.FUSEError(errno.ENOENT)
260 e = self.inodes[inode]
262 entry = llfuse.EntryAttributes()
265 entry.entry_timeout = 300
266 entry.attr_timeout = 300
268 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
269 if isinstance(e, Directory):
270 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
272 entry.st_mode |= stat.S_IFREG
273 if isinstance(e, FuseArvadosFile):
274 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
277 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
280 entry.st_uid = self.uid
281 entry.st_gid = self.gid
284 entry.st_size = e.size()
286 entry.st_blksize = 512
287 entry.st_blocks = (e.size()/512)+1
288 entry.st_atime = int(e.atime())
289 entry.st_mtime = int(e.mtime())
290 entry.st_ctime = int(e.mtime())
295 def setattr(self, inode, attr):
296 entry = self.getattr(inode)
298 e = self.inodes[inode]
300 if attr.st_size is not None and isinstance(e, FuseArvadosFile):
301 with llfuse.lock_released:
302 e.arvfile.truncate(attr.st_size)
303 entry.st_size = e.arvfile.size()
308 def lookup(self, parent_inode, name):
309 name = unicode(name, self.encoding)
315 if parent_inode in self.inodes:
316 p = self.inodes[parent_inode]
318 inode = p.parent_inode
319 elif isinstance(p, Directory) and name in p:
320 inode = p[name].inode
323 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
324 parent_inode, name, inode)
325 self.inodes[inode].inc_ref()
326 return self.getattr(inode)
328 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
330 raise llfuse.FUSEError(errno.ENOENT)
333 def forget(self, inodes):
334 for inode, nlookup in inodes:
335 ent = self.inodes[inode]
336 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
337 if ent.dec_ref(nlookup) == 0 and ent.dead:
338 self.inodes.del_entry(ent)
341 def open(self, inode, flags):
342 if inode in self.inodes:
343 p = self.inodes[inode]
345 raise llfuse.FUSEError(errno.ENOENT)
347 if isinstance(p, Directory):
348 raise llfuse.FUSEError(errno.EISDIR)
350 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
351 raise llfuse.FUSEError(errno.EPERM)
353 fh = next(self._filehandles_counter)
354 self._filehandles[fh] = FileHandle(fh, p)
359 def read(self, fh, off, size):
360 _logger.debug("arv-mount read %i %i %i", fh, off, size)
361 if fh in self._filehandles:
362 handle = self._filehandles[fh]
364 raise llfuse.FUSEError(errno.EBADF)
366 self.inodes.touch(handle.obj)
369 with llfuse.lock_released:
370 return handle.obj.readfrom(off, size, self.num_retries)
371 except arvados.errors.NotFoundError as e:
372 _logger.warning("Block not found: " + str(e))
373 raise llfuse.FUSEError(errno.EIO)
376 def write(self, fh, off, buf):
377 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
378 if fh in self._filehandles:
379 handle = self._filehandles[fh]
381 raise llfuse.FUSEError(errno.EBADF)
383 if not handle.obj.writable():
384 raise llfuse.FUSEError(errno.EPERM)
386 self.inodes.touch(handle.obj)
388 with llfuse.lock_released:
389 return handle.obj.writeto(off, buf, self.num_retries)
392 def release(self, fh):
393 if fh in self._filehandles:
395 self._filehandles[fh].flush()
396 except EnvironmentError as e:
397 raise llfuse.FUSEError(e.errno)
399 _logger.exception("Flush error")
400 self._filehandles[fh].release()
401 del self._filehandles[fh]
402 self.inodes.inode_cache.cap_cache()
404 def releasedir(self, fh):
408 def opendir(self, inode):
409 _logger.debug("arv-mount opendir: inode %i", inode)
411 if inode in self.inodes:
412 p = self.inodes[inode]
414 raise llfuse.FUSEError(errno.ENOENT)
416 if not isinstance(p, Directory):
417 raise llfuse.FUSEError(errno.ENOTDIR)
419 fh = next(self._filehandles_counter)
420 if p.parent_inode in self.inodes:
421 parent = self.inodes[p.parent_inode]
423 raise llfuse.FUSEError(errno.EIO)
428 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
432 def readdir(self, fh, off):
433 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
435 if fh in self._filehandles:
436 handle = self._filehandles[fh]
438 raise llfuse.FUSEError(errno.EBADF)
440 _logger.debug("arv-mount handle.dirobj %s", handle.obj)
443 while e < len(handle.entries):
444 if handle.entries[e][1].inode in self.inodes:
446 yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
447 except UnicodeEncodeError:
453 st = llfuse.StatvfsData()
454 st.f_bsize = 64 * 1024
467 def _check_writable(self, inode_parent):
468 if inode_parent in self.inodes:
469 p = self.inodes[inode_parent]
471 raise llfuse.FUSEError(errno.ENOENT)
473 if not isinstance(p, Directory):
474 raise llfuse.FUSEError(errno.ENOTDIR)
477 raise llfuse.FUSEError(errno.EPERM)
482 def create(self, inode_parent, name, mode, flags, ctx):
483 p = self._check_writable(inode_parent)
486 # The file entry should have been implicitly created by callback.
488 fh = next(self._filehandles_counter)
489 self._filehandles[fh] = FileHandle(fh, f)
493 return (fh, self.getattr(f.inode))
496 def mkdir(self, inode_parent, name, mode, ctx):
497 _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
499 p = self._check_writable(inode_parent)
502 # The dir entry should have been implicitly created by callback.
506 return self.getattr(d.inode)
509 def unlink(self, inode_parent, name):
510 _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
511 p = self._check_writable(inode_parent)
515 def rmdir(self, inode_parent, name):
516 _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
517 p = self._check_writable(inode_parent)
521 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
522 _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
523 src = self._check_writable(inode_parent_old)
524 dest = self._check_writable(inode_parent_new)
525 dest.rename(name_old, name_new, src)
529 if fh in self._filehandles:
530 self._filehandles[fh].flush()
532 def fsync(self, fh, datasync):
535 def fsyncdir(self, fh, datasync):