2 # FUSE driver for Arvados Keep
27 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
28 from fusefile import StringFile, FuseArvadosFile
30 _logger = logging.getLogger('arvados.arvados_fuse')
32 log_handler = logging.StreamHandler()
33 llogger = logging.getLogger('llfuse')
34 llogger.addHandler(log_handler)
35 llogger.setLevel(logging.DEBUG)
38 """Connects a numeric file handle to a File or Directory object that has
39 been opened by the client."""
41 def __init__(self, fh, obj):
50 return self.obj.flush()
53 class FileHandle(Handle):
54 """Connects a numeric file handle to a File object that has
55 been opened by the client."""
59 class DirectoryHandle(Handle):
60 """Connects a numeric file handle to a Directory object that has
61 been opened by the client."""
63 def __init__(self, fh, dirobj, entries):
64 super(DirectoryHandle, self).__init__(fh, dirobj)
65 self.entries = entries
68 class InodeCache(object):
69 def __init__(self, cap, min_entries=4):
70 self._entries = collections.OrderedDict()
72 self._counter = itertools.count(0)
75 self.min_entries = min_entries
80 def _remove(self, obj, clear):
81 if clear and not obj.clear():
82 _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
84 self._total -= obj.cache_size
85 del self._entries[obj.cache_priority]
87 del self._by_uuid[obj.cache_uuid]
90 _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
94 #_logger.debug("InodeCache total is %i cap is %i", self._total, self.cap)
95 if self._total > self.cap:
96 for key in list(self._entries.keys()):
97 if self._total < self.cap or len(self._entries) < self.min_entries:
99 self._remove(self._entries[key], True)
101 def manage(self, obj):
103 obj.cache_priority = next(self._counter)
104 obj.cache_size = obj.objsize()
105 self._entries[obj.cache_priority] = obj
106 obj.cache_uuid = obj.uuid()
108 self._by_uuid[obj.cache_uuid] = obj
109 self._total += obj.objsize()
110 _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
113 obj.cache_priority = None
115 def touch(self, obj):
117 if obj.cache_priority in self._entries:
118 self._remove(obj, False)
121 def unmanage(self, obj):
122 if obj.persisted() and obj.cache_priority in self._entries:
123 self._remove(obj, True)
125 def find(self, uuid):
126 return self._by_uuid.get(uuid)
128 class Inodes(object):
129 """Manage the set of inodes. This is the mapping from a numeric id
130 to a concrete File or Directory object"""
132 def __init__(self, inode_cache):
134 self._counter = itertools.count(llfuse.ROOT_INODE)
135 self.inode_cache = inode_cache
137 def __getitem__(self, item):
138 return self._entries[item]
140 def __setitem__(self, key, item):
141 self._entries[key] = item
144 return self._entries.iterkeys()
147 return self._entries.items()
149 def __contains__(self, k):
150 return k in self._entries
152 def touch(self, entry):
153 entry._atime = time.time()
154 self.inode_cache.touch(entry)
156 def add_entry(self, entry):
157 entry.inode = next(self._counter)
158 if entry.inode == llfuse.ROOT_INODE:
160 self._entries[entry.inode] = entry
161 self.inode_cache.manage(entry)
164 def del_entry(self, entry):
165 if entry.ref_count == 0:
166 _logger.debug("Deleting inode %i", entry.inode)
167 self.inode_cache.unmanage(entry)
168 llfuse.invalidate_inode(entry.inode)
169 del self._entries[entry.inode]
173 _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
175 def catch_exceptions(orig_func):
176 @functools.wraps(orig_func)
177 def catch_exceptions_wrapper(self, *args, **kwargs):
179 return orig_func(self, *args, **kwargs)
180 except llfuse.FUSEError:
182 except EnvironmentError as e:
183 raise llfuse.FUSEError(e.errno)
185 _logger.exception("Unhandled exception during FUSE operation")
186 raise llfuse.FUSEError(errno.EIO)
188 return catch_exceptions_wrapper
191 class Operations(llfuse.Operations):
192 """This is the main interface with llfuse.
194 The methods on this object are called by llfuse threads to service FUSE
195 events to query and read from the file system.
197 llfuse has its own global lock which is acquired before calling a request handler,
198 so request handlers do not run concurrently unless the lock is explicitly released
199 using 'with llfuse.lock_released:'
203 def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4):
204 super(Operations, self).__init__()
207 inode_cache = InodeCache(cap=256*1024*1024)
208 self.inodes = Inodes(inode_cache)
211 self.encoding = encoding
213 # dict of inode to filehandle
214 self._filehandles = {}
215 self._filehandles_counter = itertools.count(0)
217 # Other threads that need to wait until the fuse driver
218 # is fully initialized should wait() on this event object.
219 self.initlock = threading.Event()
221 self.num_retries = num_retries
226 # Allow threads that are waiting for the driver to be finished
227 # initializing to continue
234 def access(self, inode, mode, ctx):
237 def listen_for_events(self, api_client):
238 self.event = arvados.events.subscribe(api_client,
239 [["event_type", "in", ["create", "update", "delete"]]],
242 def on_event(self, ev):
243 if 'event_type' in ev:
245 item = self.inodes.inode_cache.find(ev["object_uuid"])
250 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
251 if itemparent is not None:
252 itemparent.invalidate()
256 def getattr(self, inode):
257 if inode not in self.inodes:
258 raise llfuse.FUSEError(errno.ENOENT)
260 e = self.inodes[inode]
262 entry = llfuse.EntryAttributes()
265 entry.entry_timeout = 300
266 entry.attr_timeout = 300
268 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
269 if isinstance(e, Directory):
270 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
272 entry.st_mode |= stat.S_IFREG
273 if isinstance(e, FuseArvadosFile):
274 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
277 entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
280 entry.st_uid = self.uid
281 entry.st_gid = self.gid
284 entry.st_size = e.size()
286 entry.st_blksize = 512
287 entry.st_blocks = (e.size()/512)+1
288 entry.st_atime = int(e.atime())
289 entry.st_mtime = int(e.mtime())
290 entry.st_ctime = int(e.mtime())
295 def setattr(self, inode, attr):
296 entry = self.getattr(inode)
298 e = self.inodes[inode]
300 if attr.st_size is not None and isinstance(e, FuseArvadosFile):
301 with llfuse.lock_released:
302 e.arvfile.truncate(attr.st_size)
303 entry.st_size = e.arvfile.size()
308 def lookup(self, parent_inode, name):
309 name = unicode(name, self.encoding)
315 if parent_inode in self.inodes:
316 p = self.inodes[parent_inode]
318 inode = p.parent_inode
319 elif isinstance(p, Directory) and name in p:
320 inode = p[name].inode
323 _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
324 parent_inode, name, inode)
325 self.inodes[inode].inc_ref()
326 return self.getattr(inode)
328 _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
330 raise llfuse.FUSEError(errno.ENOENT)
333 def forget(self, inodes):
334 for inode, nlookup in inodes:
335 ent = self.inodes[inode]
336 _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
337 if ent.dec_ref(nlookup) == 0 and ent.dead:
338 self.inodes.del_entry(ent)
341 def open(self, inode, flags):
342 if inode in self.inodes:
343 p = self.inodes[inode]
345 raise llfuse.FUSEError(errno.ENOENT)
347 if isinstance(p, Directory):
348 raise llfuse.FUSEError(errno.EISDIR)
350 if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
351 raise llfuse.FUSEError(errno.EPERM)
353 fh = next(self._filehandles_counter)
354 self._filehandles[fh] = FileHandle(fh, p)
359 def read(self, fh, off, size):
360 _logger.debug("arv-mount read %i %i %i", fh, off, size)
361 if fh in self._filehandles:
362 handle = self._filehandles[fh]
364 raise llfuse.FUSEError(errno.EBADF)
366 self.inodes.touch(handle.obj)
369 return handle.obj.readfrom(off, size, self.num_retries)
370 except arvados.errors.NotFoundError as e:
371 _logger.error("Block not found: " + str(e))
372 raise llfuse.FUSEError(errno.EIO)
375 def write(self, fh, off, buf):
376 _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
377 if fh in self._filehandles:
378 handle = self._filehandles[fh]
380 raise llfuse.FUSEError(errno.EBADF)
382 if not handle.obj.writable():
383 raise llfuse.FUSEError(errno.EPERM)
385 self.inodes.touch(handle.obj)
387 return handle.obj.writeto(off, buf, self.num_retries)
390 def release(self, fh):
391 if fh in self._filehandles:
393 self._filehandles[fh].flush()
394 except EnvironmentError as e:
395 raise llfuse.FUSEError(e.errno)
397 _logger.exception("Flush error")
398 self._filehandles[fh].release()
399 del self._filehandles[fh]
400 self.inodes.inode_cache.cap_cache()
402 def releasedir(self, fh):
406 def opendir(self, inode):
407 _logger.debug("arv-mount opendir: inode %i", inode)
409 if inode in self.inodes:
410 p = self.inodes[inode]
412 raise llfuse.FUSEError(errno.ENOENT)
414 if not isinstance(p, Directory):
415 raise llfuse.FUSEError(errno.ENOTDIR)
417 fh = next(self._filehandles_counter)
418 if p.parent_inode in self.inodes:
419 parent = self.inodes[p.parent_inode]
421 raise llfuse.FUSEError(errno.EIO)
426 self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
430 def readdir(self, fh, off):
431 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
433 if fh in self._filehandles:
434 handle = self._filehandles[fh]
436 raise llfuse.FUSEError(errno.EBADF)
438 _logger.debug("arv-mount handle.dirobj %s", handle.obj)
441 while e < len(handle.entries):
442 if handle.entries[e][1].inode in self.inodes:
444 yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
445 except UnicodeEncodeError:
451 st = llfuse.StatvfsData()
452 st.f_bsize = 64 * 1024
465 def _check_writable(self, inode_parent):
466 if inode_parent in self.inodes:
467 p = self.inodes[inode_parent]
469 raise llfuse.FUSEError(errno.ENOENT)
471 if not isinstance(p, Directory):
472 raise llfuse.FUSEError(errno.ENOTDIR)
475 raise llfuse.FUSEError(errno.EPERM)
480 def create(self, inode_parent, name, mode, flags, ctx):
481 p = self._check_writable(inode_parent)
484 # The file entry should have been implicitly created by callback.
486 fh = next(self._filehandles_counter)
487 self._filehandles[fh] = FileHandle(fh, f)
491 return (fh, self.getattr(f.inode))
494 def mkdir(self, inode_parent, name, mode, ctx):
495 _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
497 p = self._check_writable(inode_parent)
500 # The dir entry should have been implicitly created by callback.
504 return self.getattr(d.inode)
507 def unlink(self, inode_parent, name):
508 _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
509 p = self._check_writable(inode_parent)
513 def rmdir(self, inode_parent, name):
514 _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
515 p = self._check_writable(inode_parent)
519 def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
520 _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
521 src = self._check_writable(inode_parent_old)
522 dest = self._check_writable(inode_parent_new)
523 dest.rename(name_old, name_new, src)
527 if fh in self._filehandles:
528 self._filehandles[fh].flush()
530 def fsync(self, fh, datasync):
533 def fsyncdir(self, fh, datasync):