Merge branch 'main' into 21357-favorites-names
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 """FUSE driver for Arvados Keep
6
7 Architecture:
8
9 There is one `Operations` object per mount point.  It is the entry point for all
10 read and write requests from the llfuse module.
11
12 The operations object owns an `Inodes` object.  The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
15
16 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
17 memory footprint of file system objects and when they are last used.  When the
18 cache limit is exceeded, the least recently used objects are cleared.
19
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
21
22 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
24
25 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`.  These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
30 returing a result.
31
32 The general FUSE operation flow is as follows:
33
34 - The request handler is called with either an inode or file handle that is the
35   subject of the operation.
36
37 - Look up the inode using the Inodes table or the file handle in the
38   filehandles table to get the file system object.
39
40 - For methods that alter files or directories, check that the operation is
41   valid and permitted using _check_writable().
42
43 - Call the relevant method on the file system object.
44
45 - Return the result.
46
47 The FUSE driver supports the Arvados event bus.  When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
49
50 """
51
52 from __future__ import absolute_import
53 from __future__ import division
54 from future.utils import viewitems
55 from future.utils import native
56 from future.utils import listvalues
57 from future.utils import listitems
58 from future import standard_library
59 standard_library.install_aliases()
60 from builtins import next
61 from builtins import str
62 from builtins import object
63 import os
64 import llfuse
65 import errno
66 import stat
67 import threading
68 import arvados
69 import arvados.events
70 import logging
71 import time
72 import threading
73 import itertools
74 import collections
75 import functools
76 import arvados.keep
77 from prometheus_client import Summary
78 import queue
79
80 # Default _notify_queue has a limit of 1000 items, but it really needs to be
81 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
82 # details.
83
84 if hasattr(llfuse, 'capi'):
85     # llfuse < 0.42
86     llfuse.capi._notify_queue = queue.Queue()
87 else:
88     # llfuse >= 0.42
89     llfuse._notify_queue = queue.Queue()
90
91 LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
92
93 from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
94 from .fusefile import StringFile, FuseArvadosFile
95
96 _logger = logging.getLogger('arvados.arvados_fuse')
97
98 # Uncomment this to enable llfuse debug logging.
99 # log_handler = logging.StreamHandler()
100 # llogger = logging.getLogger('llfuse')
101 # llogger.addHandler(log_handler)
102 # llogger.setLevel(logging.DEBUG)
103
104 class Handle(object):
105     """Connects a numeric file handle to a File or Directory object that has
106     been opened by the client."""
107
108     def __init__(self, fh, obj):
109         self.fh = fh
110         self.obj = obj
111         self.obj.inc_use()
112
113     def release(self):
114         self.obj.dec_use()
115
116     def flush(self):
117         pass
118
119
120 class FileHandle(Handle):
121     """Connects a numeric file handle to a File  object that has
122     been opened by the client."""
123
124     def flush(self):
125         if self.obj.writable():
126             return self.obj.flush()
127
128
129 class DirectoryHandle(Handle):
130     """Connects a numeric file handle to a Directory object that has
131     been opened by the client."""
132
133     def __init__(self, fh, dirobj, entries):
134         super(DirectoryHandle, self).__init__(fh, dirobj)
135         self.entries = entries
136
137
138 class InodeCache(object):
139     """Records the memory footprint of objects and when they are last used.
140
141     When the cache limit is exceeded, the least recently used objects are
142     cleared.  Clearing the object means discarding its contents to release
143     memory.  The next time the object is accessed, it must be re-fetched from
144     the server.  Note that the inode cache limit is a soft limit; the cache
145     limit may be exceeded if necessary to load very large objects, it may also
146     be exceeded if open file handles prevent objects from being cleared.
147
148     """
149
150     def __init__(self, cap, min_entries=4):
151         self._entries = collections.OrderedDict()
152         self._by_uuid = {}
153         self.cap = cap
154         self._total = 0
155         self.min_entries = min_entries
156
157     def total(self):
158         return self._total
159
160     def _remove(self, obj, clear):
161         if clear:
162             # Kernel behavior seems to be that if a file is
163             # referenced, its parents remain referenced too. This
164             # means has_ref() exits early when a collection is not
165             # candidate for eviction.
166             #
167             # By contrast, in_use() doesn't increment references on
168             # parents, so it requires a full tree walk to determine if
169             # a collection is a candidate for eviction.  This takes
170             # .07s for 240000 files, which becomes a major drag when
171             # cap_cache is being called several times a second and
172             # there are multiple non-evictable collections in the
173             # cache.
174             #
175             # So it is important for performance that we do the
176             # has_ref() check first.
177
178             if obj.has_ref(True):
179                 _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
180                 return
181
182             if obj.in_use():
183                 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
184                 return
185
186             obj.kernel_invalidate()
187             _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
188             obj.clear()
189
190         # The llfuse lock is released in del_entry(), which is called by
191         # Directory.clear().  While the llfuse lock is released, it can happen
192         # that a reentrant call removes this entry before this call gets to it.
193         # Ensure that the entry is still valid before trying to remove it.
194         if obj.inode not in self._entries:
195             return
196
197         self._total -= obj.cache_size
198         del self._entries[obj.inode]
199         if obj.cache_uuid:
200             self._by_uuid[obj.cache_uuid].remove(obj)
201             if not self._by_uuid[obj.cache_uuid]:
202                 del self._by_uuid[obj.cache_uuid]
203             obj.cache_uuid = None
204         if clear:
205             _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
206
207     def cap_cache(self):
208         if self._total > self.cap:
209             for ent in listvalues(self._entries):
210                 if self._total < self.cap or len(self._entries) < self.min_entries:
211                     break
212                 self._remove(ent, True)
213
214     def manage(self, obj):
215         if obj.persisted():
216             obj.cache_size = obj.objsize()
217             self._entries[obj.inode] = obj
218             obj.cache_uuid = obj.uuid()
219             if obj.cache_uuid:
220                 if obj.cache_uuid not in self._by_uuid:
221                     self._by_uuid[obj.cache_uuid] = [obj]
222                 else:
223                     if obj not in self._by_uuid[obj.cache_uuid]:
224                         self._by_uuid[obj.cache_uuid].append(obj)
225             self._total += obj.objsize()
226             _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
227                           obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
228             self.cap_cache()
229
230     def touch(self, obj):
231         if obj.persisted():
232             if obj.inode in self._entries:
233                 self._remove(obj, False)
234             self.manage(obj)
235
236     def unmanage(self, obj):
237         if obj.persisted() and obj.inode in self._entries:
238             self._remove(obj, True)
239
240     def find_by_uuid(self, uuid):
241         return self._by_uuid.get(uuid, [])
242
243     def clear(self):
244         self._entries.clear()
245         self._by_uuid.clear()
246         self._total = 0
247
248 class Inodes(object):
249     """Manage the set of inodes.  This is the mapping from a numeric id
250     to a concrete File or Directory object"""
251
252     def __init__(self, inode_cache, encoding="utf-8"):
253         self._entries = {}
254         self._counter = itertools.count(llfuse.ROOT_INODE)
255         self.inode_cache = inode_cache
256         self.encoding = encoding
257         self.deferred_invalidations = []
258
259     def __getitem__(self, item):
260         return self._entries[item]
261
262     def __setitem__(self, key, item):
263         self._entries[key] = item
264
265     def __iter__(self):
266         return iter(self._entries.keys())
267
268     def items(self):
269         return viewitems(self._entries.items())
270
271     def __contains__(self, k):
272         return k in self._entries
273
274     def touch(self, entry):
275         entry._atime = time.time()
276         self.inode_cache.touch(entry)
277
278     def add_entry(self, entry):
279         entry.inode = next(self._counter)
280         if entry.inode == llfuse.ROOT_INODE:
281             entry.inc_ref()
282         self._entries[entry.inode] = entry
283         self.inode_cache.manage(entry)
284         return entry
285
286     def del_entry(self, entry):
287         if entry.ref_count == 0:
288             self.inode_cache.unmanage(entry)
289             del self._entries[entry.inode]
290             with llfuse.lock_released:
291                 entry.finalize()
292             entry.inode = None
293         else:
294             entry.dead = True
295             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
296
297     def invalidate_inode(self, entry):
298         if entry.has_ref(False):
299             # Only necessary if the kernel has previously done a lookup on this
300             # inode and hasn't yet forgotten about it.
301             llfuse.invalidate_inode(entry.inode)
302
303     def invalidate_entry(self, entry, name):
304         if entry.has_ref(False):
305             # Only necessary if the kernel has previously done a lookup on this
306             # inode and hasn't yet forgotten about it.
307             llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
308
309     def clear(self):
310         self.inode_cache.clear()
311
312         for k,v in viewitems(self._entries):
313             try:
314                 v.finalize()
315             except Exception as e:
316                 _logger.exception("Error during finalize of inode %i", k)
317
318         self._entries.clear()
319
320
321 def catch_exceptions(orig_func):
322     """Catch uncaught exceptions and log them consistently."""
323
324     @functools.wraps(orig_func)
325     def catch_exceptions_wrapper(self, *args, **kwargs):
326         try:
327             return orig_func(self, *args, **kwargs)
328         except llfuse.FUSEError:
329             raise
330         except EnvironmentError as e:
331             raise llfuse.FUSEError(e.errno)
332         except NotImplementedError:
333             raise llfuse.FUSEError(errno.ENOTSUP)
334         except arvados.errors.KeepWriteError as e:
335             _logger.error("Keep write error: " + str(e))
336             raise llfuse.FUSEError(errno.EIO)
337         except arvados.errors.NotFoundError as e:
338             _logger.error("Block not found error: " + str(e))
339             raise llfuse.FUSEError(errno.EIO)
340         except:
341             _logger.exception("Unhandled exception during FUSE operation")
342             raise llfuse.FUSEError(errno.EIO)
343
344     return catch_exceptions_wrapper
345
346
347 class Operations(llfuse.Operations):
348     """This is the main interface with llfuse.
349
350     The methods on this object are called by llfuse threads to service FUSE
351     events to query and read from the file system.
352
353     llfuse has its own global lock which is acquired before calling a request handler,
354     so request handlers do not run concurrently unless the lock is explicitly released
355     using 'with llfuse.lock_released:'
356
357     """
358
359     fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
360     read_time = fuse_time.labels(op='read')
361     write_time = fuse_time.labels(op='write')
362     destroy_time = fuse_time.labels(op='destroy')
363     on_event_time = fuse_time.labels(op='on_event')
364     getattr_time = fuse_time.labels(op='getattr')
365     setattr_time = fuse_time.labels(op='setattr')
366     lookup_time = fuse_time.labels(op='lookup')
367     forget_time = fuse_time.labels(op='forget')
368     open_time = fuse_time.labels(op='open')
369     release_time = fuse_time.labels(op='release')
370     opendir_time = fuse_time.labels(op='opendir')
371     readdir_time = fuse_time.labels(op='readdir')
372     statfs_time = fuse_time.labels(op='statfs')
373     create_time = fuse_time.labels(op='create')
374     mkdir_time = fuse_time.labels(op='mkdir')
375     unlink_time = fuse_time.labels(op='unlink')
376     rmdir_time = fuse_time.labels(op='rmdir')
377     rename_time = fuse_time.labels(op='rename')
378     flush_time = fuse_time.labels(op='flush')
379
380     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
381         super(Operations, self).__init__()
382
383         self._api_client = api_client
384
385         if not inode_cache:
386             inode_cache = InodeCache(cap=256*1024*1024)
387         self.inodes = Inodes(inode_cache, encoding=encoding)
388         self.uid = uid
389         self.gid = gid
390         self.enable_write = enable_write
391
392         # dict of inode to filehandle
393         self._filehandles = {}
394         self._filehandles_counter = itertools.count(0)
395
396         # Other threads that need to wait until the fuse driver
397         # is fully initialized should wait() on this event object.
398         self.initlock = threading.Event()
399
400         # If we get overlapping shutdown events (e.g., fusermount -u
401         # -z and operations.destroy()) llfuse calls forget() on inodes
402         # that have already been deleted. To avoid this, we make
403         # forget() a no-op if called after destroy().
404         self._shutdown_started = threading.Event()
405
406         self.num_retries = num_retries
407
408         self.read_counter = arvados.keep.Counter()
409         self.write_counter = arvados.keep.Counter()
410         self.read_ops_counter = arvados.keep.Counter()
411         self.write_ops_counter = arvados.keep.Counter()
412
413         self.events = None
414
415     def init(self):
416         # Allow threads that are waiting for the driver to be finished
417         # initializing to continue
418         self.initlock.set()
419
420     def metric_samples(self):
421         return self.fuse_time.collect()[0].samples
422
423     def metric_op_names(self):
424         ops = []
425         for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
426             if cur_op not in ops:
427                 ops.append(cur_op)
428         return ops
429
430     def metric_value(self, opname, metric):
431         op_value = [sample.value for sample in self.metric_samples()
432                     if sample.name == metric and sample.labels['op'] == opname]
433         return op_value[0] if len(op_value) == 1 else None
434
435     def metric_sum_func(self, opname):
436         return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
437
438     def metric_count_func(self, opname):
439         return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
440
441     @destroy_time.time()
442     @catch_exceptions
443     def destroy(self):
444         self._shutdown_started.set()
445         if self.events:
446             self.events.close()
447             self.events = None
448
449         # Different versions of llfuse require and forbid us to
450         # acquire the lock here. See #8345#note-37, #10805#note-9.
451         if LLFUSE_VERSION_0 and llfuse.lock.acquire():
452             # llfuse < 0.42
453             self.inodes.clear()
454             llfuse.lock.release()
455         else:
456             # llfuse >= 0.42
457             self.inodes.clear()
458
459     def access(self, inode, mode, ctx):
460         return True
461
462     def listen_for_events(self):
463         self.events = arvados.events.subscribe(
464             self._api_client,
465             [["event_type", "in", ["create", "update", "delete"]]],
466             self.on_event)
467
468     @on_event_time.time()
469     @catch_exceptions
470     def on_event(self, ev):
471         if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
472             return
473         with llfuse.lock:
474             properties = ev.get("properties") or {}
475             old_attrs = properties.get("old_attributes") or {}
476             new_attrs = properties.get("new_attributes") or {}
477
478             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
479                 item.invalidate()
480
481             oldowner = old_attrs.get("owner_uuid")
482             newowner = ev.get("object_owner_uuid")
483             for parent in (
484                     self.inodes.inode_cache.find_by_uuid(oldowner) +
485                     self.inodes.inode_cache.find_by_uuid(newowner)):
486                 parent.invalidate()
487
488     @getattr_time.time()
489     @catch_exceptions
490     def getattr(self, inode, ctx=None):
491         if inode not in self.inodes:
492             raise llfuse.FUSEError(errno.ENOENT)
493
494         e = self.inodes[inode]
495
496         entry = llfuse.EntryAttributes()
497         entry.st_ino = inode
498         entry.generation = 0
499         entry.entry_timeout = 0
500         entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
501
502         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
503         if isinstance(e, Directory):
504             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
505         else:
506             entry.st_mode |= stat.S_IFREG
507             if isinstance(e, FuseArvadosFile):
508                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
509
510         if self.enable_write and e.writable():
511             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
512
513         entry.st_nlink = 1
514         entry.st_uid = self.uid
515         entry.st_gid = self.gid
516         entry.st_rdev = 0
517
518         entry.st_size = e.size()
519
520         entry.st_blksize = 512
521         entry.st_blocks = (entry.st_size // 512) + 1
522         if hasattr(entry, 'st_atime_ns'):
523             # llfuse >= 0.42
524             entry.st_atime_ns = int(e.atime() * 1000000000)
525             entry.st_mtime_ns = int(e.mtime() * 1000000000)
526             entry.st_ctime_ns = int(e.mtime() * 1000000000)
527         else:
528             # llfuse < 0.42
529             entry.st_atime = int(e.atime)
530             entry.st_mtime = int(e.mtime)
531             entry.st_ctime = int(e.mtime)
532
533         return entry
534
535     @setattr_time.time()
536     @catch_exceptions
537     def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
538         entry = self.getattr(inode)
539
540         if fh is not None and fh in self._filehandles:
541             handle = self._filehandles[fh]
542             e = handle.obj
543         else:
544             e = self.inodes[inode]
545
546         if fields is None:
547             # llfuse < 0.42
548             update_size = attr.st_size is not None
549         else:
550             # llfuse >= 0.42
551             update_size = fields.update_size
552         if update_size and isinstance(e, FuseArvadosFile):
553             with llfuse.lock_released:
554                 e.arvfile.truncate(attr.st_size)
555                 entry.st_size = e.arvfile.size()
556
557         return entry
558
559     @lookup_time.time()
560     @catch_exceptions
561     def lookup(self, parent_inode, name, ctx=None):
562         name = str(name, self.inodes.encoding)
563         inode = None
564
565         if name == '.':
566             inode = parent_inode
567         else:
568             if parent_inode in self.inodes:
569                 p = self.inodes[parent_inode]
570                 self.inodes.touch(p)
571                 if name == '..':
572                     inode = p.parent_inode
573                 elif isinstance(p, Directory) and name in p:
574                     inode = p[name].inode
575
576         if inode != None:
577             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
578                       parent_inode, name, inode)
579             self.inodes[inode].inc_ref()
580             return self.getattr(inode)
581         else:
582             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
583                       parent_inode, name)
584             raise llfuse.FUSEError(errno.ENOENT)
585
586     @forget_time.time()
587     @catch_exceptions
588     def forget(self, inodes):
589         if self._shutdown_started.is_set():
590             return
591         for inode, nlookup in inodes:
592             ent = self.inodes[inode]
593             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
594             if ent.dec_ref(nlookup) == 0 and ent.dead:
595                 self.inodes.del_entry(ent)
596
597     @open_time.time()
598     @catch_exceptions
599     def open(self, inode, flags, ctx=None):
600         if inode in self.inodes:
601             p = self.inodes[inode]
602         else:
603             raise llfuse.FUSEError(errno.ENOENT)
604
605         if isinstance(p, Directory):
606             raise llfuse.FUSEError(errno.EISDIR)
607
608         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
609             raise llfuse.FUSEError(errno.EPERM)
610
611         fh = next(self._filehandles_counter)
612         self._filehandles[fh] = FileHandle(fh, p)
613         self.inodes.touch(p)
614
615         # Normally, we will have received an "update" event if the
616         # parent collection is stale here. However, even if the parent
617         # collection hasn't changed, the manifest might have been
618         # fetched so long ago that the signatures on the data block
619         # locators have expired. Calling checkupdate() on all
620         # ancestors ensures the signatures will be refreshed if
621         # necessary.
622         while p.parent_inode in self.inodes:
623             if p == self.inodes[p.parent_inode]:
624                 break
625             p = self.inodes[p.parent_inode]
626             self.inodes.touch(p)
627             p.checkupdate()
628
629         _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
630
631         return fh
632
633     @read_time.time()
634     @catch_exceptions
635     def read(self, fh, off, size):
636         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
637         self.read_ops_counter.add(1)
638
639         if fh in self._filehandles:
640             handle = self._filehandles[fh]
641         else:
642             raise llfuse.FUSEError(errno.EBADF)
643
644         self.inodes.touch(handle.obj)
645
646         r = handle.obj.readfrom(off, size, self.num_retries)
647         if r:
648             self.read_counter.add(len(r))
649         return r
650
651     @write_time.time()
652     @catch_exceptions
653     def write(self, fh, off, buf):
654         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
655         self.write_ops_counter.add(1)
656
657         if fh in self._filehandles:
658             handle = self._filehandles[fh]
659         else:
660             raise llfuse.FUSEError(errno.EBADF)
661
662         if not handle.obj.writable():
663             raise llfuse.FUSEError(errno.EPERM)
664
665         self.inodes.touch(handle.obj)
666
667         w = handle.obj.writeto(off, buf, self.num_retries)
668         if w:
669             self.write_counter.add(w)
670         return w
671
672     @release_time.time()
673     @catch_exceptions
674     def release(self, fh):
675         if fh in self._filehandles:
676             _logger.debug("arv-mount release fh %i", fh)
677             try:
678                 self._filehandles[fh].flush()
679             except Exception:
680                 raise
681             finally:
682                 self._filehandles[fh].release()
683                 del self._filehandles[fh]
684         self.inodes.inode_cache.cap_cache()
685
686     def releasedir(self, fh):
687         self.release(fh)
688
689     @opendir_time.time()
690     @catch_exceptions
691     def opendir(self, inode, ctx=None):
692         _logger.debug("arv-mount opendir: inode %i", inode)
693
694         if inode in self.inodes:
695             p = self.inodes[inode]
696         else:
697             raise llfuse.FUSEError(errno.ENOENT)
698
699         if not isinstance(p, Directory):
700             raise llfuse.FUSEError(errno.ENOTDIR)
701
702         fh = next(self._filehandles_counter)
703         if p.parent_inode in self.inodes:
704             parent = self.inodes[p.parent_inode]
705         else:
706             raise llfuse.FUSEError(errno.EIO)
707
708         # update atime
709         self.inodes.touch(p)
710         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p))
711         return fh
712
713     @readdir_time.time()
714     @catch_exceptions
715     def readdir(self, fh, off):
716         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
717
718         if fh in self._filehandles:
719             handle = self._filehandles[fh]
720         else:
721             raise llfuse.FUSEError(errno.EBADF)
722
723         e = off
724         while e < len(handle.entries):
725             if handle.entries[e][1].inode in self.inodes:
726                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
727             e += 1
728
729     @statfs_time.time()
730     @catch_exceptions
731     def statfs(self, ctx=None):
732         st = llfuse.StatvfsData()
733         st.f_bsize = 128 * 1024
734         st.f_blocks = 0
735         st.f_files = 0
736
737         st.f_bfree = 0
738         st.f_bavail = 0
739
740         st.f_ffree = 0
741         st.f_favail = 0
742
743         st.f_frsize = 0
744         return st
745
746     def _check_writable(self, inode_parent):
747         if not self.enable_write:
748             raise llfuse.FUSEError(errno.EROFS)
749
750         if inode_parent in self.inodes:
751             p = self.inodes[inode_parent]
752         else:
753             raise llfuse.FUSEError(errno.ENOENT)
754
755         if not isinstance(p, Directory):
756             raise llfuse.FUSEError(errno.ENOTDIR)
757
758         if not p.writable():
759             raise llfuse.FUSEError(errno.EPERM)
760
761         return p
762
763     @create_time.time()
764     @catch_exceptions
765     def create(self, inode_parent, name, mode, flags, ctx=None):
766         name = name.decode(encoding=self.inodes.encoding)
767         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
768
769         p = self._check_writable(inode_parent)
770         p.create(name)
771
772         # The file entry should have been implicitly created by callback.
773         f = p[name]
774         fh = next(self._filehandles_counter)
775         self._filehandles[fh] = FileHandle(fh, f)
776         self.inodes.touch(p)
777
778         f.inc_ref()
779         return (fh, self.getattr(f.inode))
780
781     @mkdir_time.time()
782     @catch_exceptions
783     def mkdir(self, inode_parent, name, mode, ctx=None):
784         name = name.decode(encoding=self.inodes.encoding)
785         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
786
787         p = self._check_writable(inode_parent)
788         p.mkdir(name)
789
790         # The dir entry should have been implicitly created by callback.
791         d = p[name]
792
793         d.inc_ref()
794         return self.getattr(d.inode)
795
796     @unlink_time.time()
797     @catch_exceptions
798     def unlink(self, inode_parent, name, ctx=None):
799         name = name.decode(encoding=self.inodes.encoding)
800         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
801         p = self._check_writable(inode_parent)
802         p.unlink(name)
803
804     @rmdir_time.time()
805     @catch_exceptions
806     def rmdir(self, inode_parent, name, ctx=None):
807         name = name.decode(encoding=self.inodes.encoding)
808         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
809         p = self._check_writable(inode_parent)
810         p.rmdir(name)
811
812     @rename_time.time()
813     @catch_exceptions
814     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
815         name_old = name_old.decode(encoding=self.inodes.encoding)
816         name_new = name_new.decode(encoding=self.inodes.encoding)
817         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
818         src = self._check_writable(inode_parent_old)
819         dest = self._check_writable(inode_parent_new)
820         dest.rename(name_old, name_new, src)
821
822     @flush_time.time()
823     @catch_exceptions
824     def flush(self, fh):
825         if fh in self._filehandles:
826             self._filehandles[fh].flush()
827
828     def fsync(self, fh, datasync):
829         self.flush(fh)
830
831     def fsyncdir(self, fh, datasync):
832         self.flush(fh)