2 # FUSE driver for Arvados Keep
27 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
28 from fusefile import StringFile, FuseArvadosFile
30 _logger = logging.getLogger('arvados.arvados_fuse')
32 log_handler = logging.StreamHandler()
33 llogger = logging.getLogger('llfuse')
34 llogger.addHandler(log_handler)
35 llogger.setLevel(logging.DEBUG)
38 """Connects a numeric file handle to a File or Directory object that has
39 been opened by the client."""
41 def __init__(self, fh, obj):
50 return self.obj.flush()
53 class FileHandle(Handle):
54 """Connects a numeric file handle to a File object that has
55 been opened by the client."""
59 class DirectoryHandle(Handle):
60 """Connects a numeric file handle to a Directory object that has
61 been opened by the client."""
63 def __init__(self, fh, dirobj, entries):
64 super(DirectoryHandle, self).__init__(fh, dirobj)
65 self.entries = entries
68 class InodeCache(object):
69 def __init__(self, cap, min_entries=4):
70 self._entries = collections.OrderedDict()
72 self._counter = itertools.count(0)
75 self.min_entries = min_entries
80 def _remove(self, obj, clear):
81 if clear and not obj.clear():
82 _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
84 self._total -= obj.cache_size
85 del self._entries[obj.cache_priority]
87 del self._by_uuid[obj.cache_uuid]
90 _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
94 #_logger.debug("InodeCache total is %i cap is %i", self._total, self.cap)
95 if self._total > self.cap:
96 for key in list(self._entries.keys()):
97 if self._total < self.cap or len(self._entries) < self.min_entries:
99 self._remove(self._entries[key], True)
101 def manage(self, obj):
103 obj.cache_priority = next(self._counter)
104 obj.cache_size = obj.objsize()
105 self._entries[obj.cache_priority] = obj
106 obj.cache_uuid = obj.uuid()
108 self._by_uuid[obj.cache_uuid] = obj
109 self._total += obj.objsize()
110 _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
113 obj.cache_priority = None
115 def touch(self, obj):
117 if obj.cache_priority in self._entries:
118 self._remove(obj, False)
121 def unmanage(self, obj):
122 if obj.persisted() and obj.cache_priority in self._entries:
123 self._remove(obj, True)
125 def find(self, uuid):
126 return self._by_uuid.get(uuid)
128 class Inodes(object):
129 """Manage the set of inodes. This is the mapping from a numeric id
130 to a concrete File or Directory object"""
132 def __init__(self, inode_cache):
134 self._counter = itertools.count(llfuse.ROOT_INODE)
135 self.inode_cache = inode_cache
137 def __getitem__(self, item):
138 return self._entries[item]
140 def __setitem__(self, key, item):
141 self._entries[key] = item
144 return self._entries.iterkeys()
147 return self._entries.items()
149 def __contains__(self, k):
150 return k in self._entries
152 def touch(self, entry):
153 entry._atime = time.time()
154 self.inode_cache.touch(entry)
156 def add_entry(self, entry):
157 entry.inode = next(self._counter)
158 if entry.inode == llfuse.ROOT_INODE:
160 self._entries[entry.inode] = entry
161 self.inode_cache.manage(entry)
164 def del_entry(self, entry):
165 if entry.ref_count == 0:
166 _logger.debug("Deleting inode %i", entry.inode)
167 self.inode_cache.unmanage(entry)
168 llfuse.invalidate_inode(entry.inode)
169 del self._entries[entry.inode]
173 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
175 def catch_exceptions(orig_func):
176 @functools.wraps(orig_func)
177 def catch_exceptions_wrapper(self, *args, **kwargs):
179 return orig_func(self, *args, **kwargs)
180 except llfuse.FUSEError:
182 except EnvironmentError as e:
183 raise llfuse.FUSEError(e.errno)
185 _logger.exception("Unhandled exception during FUSE operation")
186 raise llfuse.FUSEError(errno.EIO)
188 return catch_exceptions_wrapper
191 class Operations(llfuse.Operations):
192 """This is the main interface with llfuse.
194 The methods on this object are called by llfuse threads to service FUSE
195 events to query and read from the file system.
197 llfuse has its own global lock which is acquired before calling a request handler,
198 so request handlers do not run concurrently unless the lock is explicitly released
199 using 'with llfuse.lock_released:'
203 def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4):
204 super(Operations, self).__init__()
207 inode_cache = InodeCache(cap=256*1024*1024)
208 self.inodes = Inodes(inode_cache)
211 self.encoding = encoding
213 # dict of inode to filehandle
214 self._filehandles = {}
215 self._filehandles_counter = itertools.count(0)
217 # Other threads that need to wait until the fuse driver
218 # is fully initialized should wait() on this event object.
219 self.initlock = threading.Event()
221 self.num_retries = num_retries
226 # Allow threads that are waiting for the driver to be finished
227 # initializing to continue
234 def access(self, inode, mode, ctx):
237 def listen_for_events(self, api_client):
238 self.event = arvados.events.subscribe(api_client,
239 [["event_type", "in", ["create", "update", "delete"]]],
242 def on_event(self, ev):
243 if 'event_type' in ev:
245 item = self.inodes.inode_cache.find(ev["object_uuid"])
250 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
251 if itemparent is not None:
252 itemparent.invalidate()
256 def getattr(self, inode):
257 if inode not in self.inodes:
258 raise llfuse.FUSEError(errno.ENOENT)
260 e = self.inodes[inode]
262 entry = llfuse.EntryAttributes()
265 entry.entry_timeout = 300
266 entry.attr_timeout = 300
268 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
269 if isinstance(e, Directory):
270 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
272 entry.st_mode |= stat.S_IFREG
273 if isinstance(e, FuseArvadosFile):
274 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
277 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
280 entry.st_uid = self.uid
281 entry.st_gid = self.gid
284 entry.st_size = e.size()
286 entry.st_blksize = 512
287 entry.st_blocks = (e.size()/512)+1
288 entry.st_atime = int(e.atime())
289 entry.st_mtime = int(e.mtime())
290 entry.st_ctime = int(e.mtime())
295 def lookup(self, parent_inode, name):
296 name = unicode(name, self.encoding)
302 if parent_inode in self.inodes:
303 p = self.inodes[parent_inode]
305 inode = p.parent_inode
306 elif isinstance(p, Directory) and name in p:
307 inode = p[name].inode
310 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
311 parent_inode, name, inode)
312 self.inodes[inode].inc_ref()
313 return self.getattr(inode)
315 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
317 raise llfuse.FUSEError(errno.ENOENT)
320 def forget(self, inodes):
321 for inode, nlookup in inodes:
322 ent = self.inodes[inode]
323 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
324 if ent.dec_ref(nlookup) == 0 and ent.dead:
325 self.inodes.del_entry(ent)
328 def open(self, inode, flags):
329 if inode in self.inodes:
330 p = self.inodes[inode]
332 raise llfuse.FUSEError(errno.ENOENT)
334 if isinstance(p, Directory):
335 raise llfuse.FUSEError(errno.EISDIR)
337 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
338 raise llfuse.FUSEError(errno.EPERM)
340 fh = next(self._filehandles_counter)
341 self._filehandles[fh] = FileHandle(fh, p)
346 def read(self, fh, off, size):
347 _logger.debug("arv-mount read %i %i %i", fh, off, size)
348 if fh in self._filehandles:
349 handle = self._filehandles[fh]
351 raise llfuse.FUSEError(errno.EBADF)
353 self.inodes.touch(handle.obj)
356 with llfuse.lock_released:
357 return handle.obj.readfrom(off, size, self.num_retries)
358 except arvados.errors.NotFoundError as e:
359 _logger.warning("Block not found: " + str(e))
360 raise llfuse.FUSEError(errno.EIO)
363 def write(self, fh, off, buf):
364 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
365 if fh in self._filehandles:
366 handle = self._filehandles[fh]
368 raise llfuse.FUSEError(errno.EBADF)
370 if not handle.obj.writable():
371 raise llfuse.FUSEError(errno.EPERM)
373 self.inodes.touch(handle.obj)
375 with llfuse.lock_released:
376 return handle.obj.writeto(off, buf, self.num_retries)
379 def release(self, fh):
380 if fh in self._filehandles:
382 self._filehandles[fh].flush()
383 except EnvironmentError as e:
384 raise llfuse.FUSEError(e.errno)
386 _logger.exception("Flush error")
387 self._filehandles[fh].release()
388 del self._filehandles[fh]
389 self.inodes.inode_cache.cap_cache()
391 def releasedir(self, fh):
395 def opendir(self, inode):
396 _logger.debug("arv-mount opendir: inode %i", inode)
398 if inode in self.inodes:
399 p = self.inodes[inode]
401 raise llfuse.FUSEError(errno.ENOENT)
403 if not isinstance(p, Directory):
404 raise llfuse.FUSEError(errno.ENOTDIR)
406 fh = next(self._filehandles_counter)
407 if p.parent_inode in self.inodes:
408 parent = self.inodes[p.parent_inode]
410 raise llfuse.FUSEError(errno.EIO)
415 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
419 def readdir(self, fh, off):
420 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
422 if fh in self._filehandles:
423 handle = self._filehandles[fh]
425 raise llfuse.FUSEError(errno.EBADF)
427 _logger.debug("arv-mount handle.dirobj %s", handle.obj)
430 while e < len(handle.entries):
431 if handle.entries[e][1].inode in self.inodes:
433 yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
434 except UnicodeEncodeError:
440 st = llfuse.StatvfsData()
441 st.f_bsize = 64 * 1024
454 def _check_writable(self, inode_parent):
455 if inode_parent in self.inodes:
456 p = self.inodes[inode_parent]
458 raise llfuse.FUSEError(errno.ENOENT)
460 if not isinstance(p, Directory):
461 raise llfuse.FUSEError(errno.ENOTDIR)
464 raise llfuse.FUSEError(errno.EPERM)
469 def create(self, inode_parent, name, mode, flags, ctx):
470 p = self._check_writable(inode_parent)
473 # The file entry should have been implicitly created by callback.
475 fh = next(self._filehandles_counter)
476 self._filehandles[fh] = FileHandle(fh, f)
480 return (fh, self.getattr(f.inode))
483 def mkdir(self, inode_parent, name, mode, ctx):
484 _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
486 p = self._check_writable(inode_parent)
489 # The dir entry should have been implicitly created by callback.
493 return self.getattr(d.inode)
496 def unlink(self, inode_parent, name):
497 _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
498 p = self._check_writable(inode_parent)
502 def rmdir(self, inode_parent, name):
503 _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
504 p = self._check_writable(inode_parent)
508 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
509 _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
510 src = self._check_writable(inode_parent_old)
511 dest = self._check_writable(inode_parent_new)
512 dest.rename(name_old, name_new, src)
516 if fh in self._filehandles:
517 self._filehandles[fh].flush()
519 def fsync(self, fh, datasync):
522 def fsyncdir(self, fh, datasync):