17778: Merge branch 'master' into 17778-doc-update
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 """FUSE driver for Arvados Keep
6
7 Architecture:
8
9 There is one `Operations` object per mount point.  It is the entry point for all
10 read and write requests from the llfuse module.
11
12 The operations object owns an `Inodes` object.  The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
15
16 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
17 memory footprint of file system objects and when they are last used.  When the
18 cache limit is exceeded, the least recently used objects are cleared.
19
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
21
22 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
24
25 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`.  These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
30 returing a result.
31
32 The general FUSE operation flow is as follows:
33
34 - The request handler is called with either an inode or file handle that is the
35   subject of the operation.
36
37 - Look up the inode using the Inodes table or the file handle in the
38   filehandles table to get the file system object.
39
40 - For methods that alter files or directories, check that the operation is
41   valid and permitted using _check_writable().
42
43 - Call the relevant method on the file system object.
44
45 - Return the result.
46
47 The FUSE driver supports the Arvados event bus.  When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
49
50 """
51
52 from __future__ import absolute_import
53 from __future__ import division
54 from future.utils import viewitems
55 from future.utils import native
56 from future.utils import listvalues
57 from future.utils import listitems
58 from future import standard_library
59 standard_library.install_aliases()
60 from builtins import next
61 from builtins import str
62 from builtins import object
63 import os
64 import llfuse
65 import errno
66 import stat
67 import threading
68 import arvados
69 import arvados.events
70 import logging
71 import time
72 import threading
73 import itertools
74 import collections
75 import functools
76 import arvados.keep
77 from prometheus_client import Summary
78 import queue
79
80 # Default _notify_queue has a limit of 1000 items, but it really needs to be
81 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
82 # details.
83
84 if hasattr(llfuse, 'capi'):
85     # llfuse < 0.42
86     llfuse.capi._notify_queue = queue.Queue()
87 else:
88     # llfuse >= 0.42
89     llfuse._notify_queue = queue.Queue()
90
91 LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
92
93 from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
94 from .fusefile import StringFile, FuseArvadosFile
95
96 _logger = logging.getLogger('arvados.arvados_fuse')
97
98 # Uncomment this to enable llfuse debug logging.
99 # log_handler = logging.StreamHandler()
100 # llogger = logging.getLogger('llfuse')
101 # llogger.addHandler(log_handler)
102 # llogger.setLevel(logging.DEBUG)
103
104 class Handle(object):
105     """Connects a numeric file handle to a File or Directory object that has
106     been opened by the client."""
107
108     def __init__(self, fh, obj):
109         self.fh = fh
110         self.obj = obj
111         self.obj.inc_use()
112
113     def release(self):
114         self.obj.dec_use()
115
116     def flush(self):
117         pass
118
119
120 class FileHandle(Handle):
121     """Connects a numeric file handle to a File  object that has
122     been opened by the client."""
123
124     def flush(self):
125         if self.obj.writable():
126             return self.obj.flush()
127
128
129 class DirectoryHandle(Handle):
130     """Connects a numeric file handle to a Directory object that has
131     been opened by the client."""
132
133     def __init__(self, fh, dirobj, entries):
134         super(DirectoryHandle, self).__init__(fh, dirobj)
135         self.entries = entries
136
137
138 class InodeCache(object):
139     """Records the memory footprint of objects and when they are last used.
140
141     When the cache limit is exceeded, the least recently used objects are
142     cleared.  Clearing the object means discarding its contents to release
143     memory.  The next time the object is accessed, it must be re-fetched from
144     the server.  Note that the inode cache limit is a soft limit; the cache
145     limit may be exceeded if necessary to load very large objects, it may also
146     be exceeded if open file handles prevent objects from being cleared.
147
148     """
149
150     def __init__(self, cap, min_entries=4):
151         self._entries = collections.OrderedDict()
152         self._by_uuid = {}
153         self.cap = cap
154         self._total = 0
155         self.min_entries = min_entries
156
157     def total(self):
158         return self._total
159
160     def _remove(self, obj, clear):
161         if clear:
162             # Kernel behavior seems to be that if a file is
163             # referenced, its parents remain referenced too. This
164             # means has_ref() exits early when a collection is not
165             # candidate for eviction.
166             #
167             # By contrast, in_use() doesn't increment references on
168             # parents, so it requires a full tree walk to determine if
169             # a collection is a candidate for eviction.  This takes
170             # .07s for 240000 files, which becomes a major drag when
171             # cap_cache is being called several times a second and
172             # there are multiple non-evictable collections in the
173             # cache.
174             #
175             # So it is important for performance that we do the
176             # has_ref() check first.
177
178             if obj.has_ref(True):
179                 _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
180                 return
181
182             if obj.in_use():
183                 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
184                 return
185
186             obj.kernel_invalidate()
187             _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
188             obj.clear()
189
190         # The llfuse lock is released in del_entry(), which is called by
191         # Directory.clear().  While the llfuse lock is released, it can happen
192         # that a reentrant call removes this entry before this call gets to it.
193         # Ensure that the entry is still valid before trying to remove it.
194         if obj.inode not in self._entries:
195             return
196
197         self._total -= obj.cache_size
198         del self._entries[obj.inode]
199         if obj.cache_uuid:
200             self._by_uuid[obj.cache_uuid].remove(obj)
201             if not self._by_uuid[obj.cache_uuid]:
202                 del self._by_uuid[obj.cache_uuid]
203             obj.cache_uuid = None
204         if clear:
205             _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
206
207     def cap_cache(self):
208         if self._total > self.cap:
209             for ent in listvalues(self._entries):
210                 if self._total < self.cap or len(self._entries) < self.min_entries:
211                     break
212                 self._remove(ent, True)
213
214     def manage(self, obj):
215         if obj.persisted():
216             obj.cache_size = obj.objsize()
217             self._entries[obj.inode] = obj
218             obj.cache_uuid = obj.uuid()
219             if obj.cache_uuid:
220                 if obj.cache_uuid not in self._by_uuid:
221                     self._by_uuid[obj.cache_uuid] = [obj]
222                 else:
223                     if obj not in self._by_uuid[obj.cache_uuid]:
224                         self._by_uuid[obj.cache_uuid].append(obj)
225             self._total += obj.objsize()
226             _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
227                           obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
228             self.cap_cache()
229
230     def touch(self, obj):
231         if obj.persisted():
232             if obj.inode in self._entries:
233                 self._remove(obj, False)
234             self.manage(obj)
235
236     def unmanage(self, obj):
237         if obj.persisted() and obj.inode in self._entries:
238             self._remove(obj, True)
239
240     def find_by_uuid(self, uuid):
241         return self._by_uuid.get(uuid, [])
242
243     def clear(self):
244         self._entries.clear()
245         self._by_uuid.clear()
246         self._total = 0
247
248 class Inodes(object):
249     """Manage the set of inodes.  This is the mapping from a numeric id
250     to a concrete File or Directory object"""
251
252     def __init__(self, inode_cache, encoding="utf-8"):
253         self._entries = {}
254         self._counter = itertools.count(llfuse.ROOT_INODE)
255         self.inode_cache = inode_cache
256         self.encoding = encoding
257         self.deferred_invalidations = []
258
259     def __getitem__(self, item):
260         return self._entries[item]
261
262     def __setitem__(self, key, item):
263         self._entries[key] = item
264
265     def __iter__(self):
266         return iter(self._entries.keys())
267
268     def items(self):
269         return viewitems(self._entries.items())
270
271     def __contains__(self, k):
272         return k in self._entries
273
274     def touch(self, entry):
275         entry._atime = time.time()
276         self.inode_cache.touch(entry)
277
278     def add_entry(self, entry):
279         entry.inode = next(self._counter)
280         if entry.inode == llfuse.ROOT_INODE:
281             entry.inc_ref()
282         self._entries[entry.inode] = entry
283         self.inode_cache.manage(entry)
284         return entry
285
286     def del_entry(self, entry):
287         if entry.ref_count == 0:
288             self.inode_cache.unmanage(entry)
289             del self._entries[entry.inode]
290             with llfuse.lock_released:
291                 entry.finalize()
292             entry.inode = None
293         else:
294             entry.dead = True
295             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
296
297     def invalidate_inode(self, entry):
298         if entry.has_ref(False):
299             # Only necessary if the kernel has previously done a lookup on this
300             # inode and hasn't yet forgotten about it.
301             llfuse.invalidate_inode(entry.inode)
302
303     def invalidate_entry(self, entry, name):
304         if entry.has_ref(False):
305             # Only necessary if the kernel has previously done a lookup on this
306             # inode and hasn't yet forgotten about it.
307             llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
308
309     def clear(self):
310         self.inode_cache.clear()
311
312         for k,v in viewitems(self._entries):
313             try:
314                 v.finalize()
315             except Exception as e:
316                 _logger.exception("Error during finalize of inode %i", k)
317
318         self._entries.clear()
319
320
321 def catch_exceptions(orig_func):
322     """Catch uncaught exceptions and log them consistently."""
323
324     @functools.wraps(orig_func)
325     def catch_exceptions_wrapper(self, *args, **kwargs):
326         try:
327             return orig_func(self, *args, **kwargs)
328         except llfuse.FUSEError:
329             raise
330         except EnvironmentError as e:
331             raise llfuse.FUSEError(e.errno)
332         except arvados.errors.KeepWriteError as e:
333             _logger.error("Keep write error: " + str(e))
334             raise llfuse.FUSEError(errno.EIO)
335         except arvados.errors.NotFoundError as e:
336             _logger.error("Block not found error: " + str(e))
337             raise llfuse.FUSEError(errno.EIO)
338         except:
339             _logger.exception("Unhandled exception during FUSE operation")
340             raise llfuse.FUSEError(errno.EIO)
341
342     return catch_exceptions_wrapper
343
344
345 class Operations(llfuse.Operations):
346     """This is the main interface with llfuse.
347
348     The methods on this object are called by llfuse threads to service FUSE
349     events to query and read from the file system.
350
351     llfuse has its own global lock which is acquired before calling a request handler,
352     so request handlers do not run concurrently unless the lock is explicitly released
353     using 'with llfuse.lock_released:'
354
355     """
356
357     fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
358     read_time = fuse_time.labels(op='read')
359     write_time = fuse_time.labels(op='write')
360     destroy_time = fuse_time.labels(op='destroy')
361     on_event_time = fuse_time.labels(op='on_event')
362     getattr_time = fuse_time.labels(op='getattr')
363     setattr_time = fuse_time.labels(op='setattr')
364     lookup_time = fuse_time.labels(op='lookup')
365     forget_time = fuse_time.labels(op='forget')
366     open_time = fuse_time.labels(op='open')
367     release_time = fuse_time.labels(op='release')
368     opendir_time = fuse_time.labels(op='opendir')
369     readdir_time = fuse_time.labels(op='readdir')
370     statfs_time = fuse_time.labels(op='statfs')
371     create_time = fuse_time.labels(op='create')
372     mkdir_time = fuse_time.labels(op='mkdir')
373     unlink_time = fuse_time.labels(op='unlink')
374     rmdir_time = fuse_time.labels(op='rmdir')
375     rename_time = fuse_time.labels(op='rename')
376     flush_time = fuse_time.labels(op='flush')
377
378     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
379         super(Operations, self).__init__()
380
381         self._api_client = api_client
382
383         if not inode_cache:
384             inode_cache = InodeCache(cap=256*1024*1024)
385         self.inodes = Inodes(inode_cache, encoding=encoding)
386         self.uid = uid
387         self.gid = gid
388         self.enable_write = enable_write
389
390         # dict of inode to filehandle
391         self._filehandles = {}
392         self._filehandles_counter = itertools.count(0)
393
394         # Other threads that need to wait until the fuse driver
395         # is fully initialized should wait() on this event object.
396         self.initlock = threading.Event()
397
398         # If we get overlapping shutdown events (e.g., fusermount -u
399         # -z and operations.destroy()) llfuse calls forget() on inodes
400         # that have already been deleted. To avoid this, we make
401         # forget() a no-op if called after destroy().
402         self._shutdown_started = threading.Event()
403
404         self.num_retries = num_retries
405
406         self.read_counter = arvados.keep.Counter()
407         self.write_counter = arvados.keep.Counter()
408         self.read_ops_counter = arvados.keep.Counter()
409         self.write_ops_counter = arvados.keep.Counter()
410
411         self.events = None
412
413     def init(self):
414         # Allow threads that are waiting for the driver to be finished
415         # initializing to continue
416         self.initlock.set()
417
418     def metric_samples(self):
419         return self.fuse_time.collect()[0].samples
420
421     def metric_op_names(self):
422         ops = []
423         for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
424             if cur_op not in ops:
425                 ops.append(cur_op)
426         return ops
427
428     def metric_value(self, opname, metric):
429         op_value = [sample.value for sample in self.metric_samples()
430                     if sample.name == metric and sample.labels['op'] == opname]
431         return op_value[0] if len(op_value) == 1 else None
432
433     def metric_sum_func(self, opname):
434         return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
435
436     def metric_count_func(self, opname):
437         return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
438
439     @destroy_time.time()
440     @catch_exceptions
441     def destroy(self):
442         self._shutdown_started.set()
443         if self.events:
444             self.events.close()
445             self.events = None
446
447         # Different versions of llfuse require and forbid us to
448         # acquire the lock here. See #8345#note-37, #10805#note-9.
449         if LLFUSE_VERSION_0 and llfuse.lock.acquire():
450             # llfuse < 0.42
451             self.inodes.clear()
452             llfuse.lock.release()
453         else:
454             # llfuse >= 0.42
455             self.inodes.clear()
456
457     def access(self, inode, mode, ctx):
458         return True
459
460     def listen_for_events(self):
461         self.events = arvados.events.subscribe(
462             self._api_client,
463             [["event_type", "in", ["create", "update", "delete"]]],
464             self.on_event)
465
466     @on_event_time.time()
467     @catch_exceptions
468     def on_event(self, ev):
469         if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
470             return
471         with llfuse.lock:
472             properties = ev.get("properties") or {}
473             old_attrs = properties.get("old_attributes") or {}
474             new_attrs = properties.get("new_attributes") or {}
475
476             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
477                 item.invalidate()
478                 if ev.get("object_kind") == "arvados#collection":
479                     pdh = new_attrs.get("portable_data_hash")
480                     # new_attributes.modified_at currently lacks
481                     # subsecond precision (see #6347) so use event_at
482                     # which should always be the same.
483                     stamp = ev.get("event_at")
484                     if (stamp and pdh and item.writable() and
485                         item.collection is not None and
486                         item.collection.modified() and
487                         new_attrs.get("is_trashed") is not True):
488                         item.update(to_record_version=(stamp, pdh))
489
490             oldowner = old_attrs.get("owner_uuid")
491             newowner = ev.get("object_owner_uuid")
492             for parent in (
493                     self.inodes.inode_cache.find_by_uuid(oldowner) +
494                     self.inodes.inode_cache.find_by_uuid(newowner)):
495                 parent.child_event(ev)
496
497     @getattr_time.time()
498     @catch_exceptions
499     def getattr(self, inode, ctx=None):
500         if inode not in self.inodes:
501             raise llfuse.FUSEError(errno.ENOENT)
502
503         e = self.inodes[inode]
504
505         entry = llfuse.EntryAttributes()
506         entry.st_ino = inode
507         entry.generation = 0
508         entry.entry_timeout = 0
509         entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
510
511         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
512         if isinstance(e, Directory):
513             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
514         else:
515             entry.st_mode |= stat.S_IFREG
516             if isinstance(e, FuseArvadosFile):
517                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
518
519         if self.enable_write and e.writable():
520             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
521
522         entry.st_nlink = 1
523         entry.st_uid = self.uid
524         entry.st_gid = self.gid
525         entry.st_rdev = 0
526
527         entry.st_size = e.size()
528
529         entry.st_blksize = 512
530         entry.st_blocks = (entry.st_size // 512) + 1
531         if hasattr(entry, 'st_atime_ns'):
532             # llfuse >= 0.42
533             entry.st_atime_ns = int(e.atime() * 1000000000)
534             entry.st_mtime_ns = int(e.mtime() * 1000000000)
535             entry.st_ctime_ns = int(e.mtime() * 1000000000)
536         else:
537             # llfuse < 0.42
538             entry.st_atime = int(e.atime)
539             entry.st_mtime = int(e.mtime)
540             entry.st_ctime = int(e.mtime)
541
542         return entry
543
544     @setattr_time.time()
545     @catch_exceptions
546     def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
547         entry = self.getattr(inode)
548
549         if fh is not None and fh in self._filehandles:
550             handle = self._filehandles[fh]
551             e = handle.obj
552         else:
553             e = self.inodes[inode]
554
555         if fields is None:
556             # llfuse < 0.42
557             update_size = attr.st_size is not None
558         else:
559             # llfuse >= 0.42
560             update_size = fields.update_size
561         if update_size and isinstance(e, FuseArvadosFile):
562             with llfuse.lock_released:
563                 e.arvfile.truncate(attr.st_size)
564                 entry.st_size = e.arvfile.size()
565
566         return entry
567
568     @lookup_time.time()
569     @catch_exceptions
570     def lookup(self, parent_inode, name, ctx=None):
571         name = str(name, self.inodes.encoding)
572         inode = None
573
574         if name == '.':
575             inode = parent_inode
576         else:
577             if parent_inode in self.inodes:
578                 p = self.inodes[parent_inode]
579                 self.inodes.touch(p)
580                 if name == '..':
581                     inode = p.parent_inode
582                 elif isinstance(p, Directory) and name in p:
583                     inode = p[name].inode
584
585         if inode != None:
586             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
587                       parent_inode, name, inode)
588             self.inodes[inode].inc_ref()
589             return self.getattr(inode)
590         else:
591             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
592                       parent_inode, name)
593             raise llfuse.FUSEError(errno.ENOENT)
594
595     @forget_time.time()
596     @catch_exceptions
597     def forget(self, inodes):
598         if self._shutdown_started.is_set():
599             return
600         for inode, nlookup in inodes:
601             ent = self.inodes[inode]
602             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
603             if ent.dec_ref(nlookup) == 0 and ent.dead:
604                 self.inodes.del_entry(ent)
605
606     @open_time.time()
607     @catch_exceptions
608     def open(self, inode, flags, ctx=None):
609         if inode in self.inodes:
610             p = self.inodes[inode]
611         else:
612             raise llfuse.FUSEError(errno.ENOENT)
613
614         if isinstance(p, Directory):
615             raise llfuse.FUSEError(errno.EISDIR)
616
617         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
618             raise llfuse.FUSEError(errno.EPERM)
619
620         fh = next(self._filehandles_counter)
621         self._filehandles[fh] = FileHandle(fh, p)
622         self.inodes.touch(p)
623
624         # Normally, we will have received an "update" event if the
625         # parent collection is stale here. However, even if the parent
626         # collection hasn't changed, the manifest might have been
627         # fetched so long ago that the signatures on the data block
628         # locators have expired. Calling checkupdate() on all
629         # ancestors ensures the signatures will be refreshed if
630         # necessary.
631         while p.parent_inode in self.inodes:
632             if p == self.inodes[p.parent_inode]:
633                 break
634             p = self.inodes[p.parent_inode]
635             self.inodes.touch(p)
636             p.checkupdate()
637
638         _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
639
640         return fh
641
642     @read_time.time()
643     @catch_exceptions
644     def read(self, fh, off, size):
645         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
646         self.read_ops_counter.add(1)
647
648         if fh in self._filehandles:
649             handle = self._filehandles[fh]
650         else:
651             raise llfuse.FUSEError(errno.EBADF)
652
653         self.inodes.touch(handle.obj)
654
655         r = handle.obj.readfrom(off, size, self.num_retries)
656         if r:
657             self.read_counter.add(len(r))
658         return r
659
660     @write_time.time()
661     @catch_exceptions
662     def write(self, fh, off, buf):
663         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
664         self.write_ops_counter.add(1)
665
666         if fh in self._filehandles:
667             handle = self._filehandles[fh]
668         else:
669             raise llfuse.FUSEError(errno.EBADF)
670
671         if not handle.obj.writable():
672             raise llfuse.FUSEError(errno.EPERM)
673
674         self.inodes.touch(handle.obj)
675
676         w = handle.obj.writeto(off, buf, self.num_retries)
677         if w:
678             self.write_counter.add(w)
679         return w
680
681     @release_time.time()
682     @catch_exceptions
683     def release(self, fh):
684         if fh in self._filehandles:
685             _logger.debug("arv-mount release fh %i", fh)
686             try:
687                 self._filehandles[fh].flush()
688             except Exception:
689                 raise
690             finally:
691                 self._filehandles[fh].release()
692                 del self._filehandles[fh]
693         self.inodes.inode_cache.cap_cache()
694
695     def releasedir(self, fh):
696         self.release(fh)
697
698     @opendir_time.time()
699     @catch_exceptions
700     def opendir(self, inode, ctx=None):
701         _logger.debug("arv-mount opendir: inode %i", inode)
702
703         if inode in self.inodes:
704             p = self.inodes[inode]
705         else:
706             raise llfuse.FUSEError(errno.ENOENT)
707
708         if not isinstance(p, Directory):
709             raise llfuse.FUSEError(errno.ENOTDIR)
710
711         fh = next(self._filehandles_counter)
712         if p.parent_inode in self.inodes:
713             parent = self.inodes[p.parent_inode]
714         else:
715             raise llfuse.FUSEError(errno.EIO)
716
717         # update atime
718         self.inodes.touch(p)
719         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p))
720         return fh
721
722     @readdir_time.time()
723     @catch_exceptions
724     def readdir(self, fh, off):
725         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
726
727         if fh in self._filehandles:
728             handle = self._filehandles[fh]
729         else:
730             raise llfuse.FUSEError(errno.EBADF)
731
732         e = off
733         while e < len(handle.entries):
734             if handle.entries[e][1].inode in self.inodes:
735                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
736             e += 1
737
738     @statfs_time.time()
739     @catch_exceptions
740     def statfs(self, ctx=None):
741         st = llfuse.StatvfsData()
742         st.f_bsize = 128 * 1024
743         st.f_blocks = 0
744         st.f_files = 0
745
746         st.f_bfree = 0
747         st.f_bavail = 0
748
749         st.f_ffree = 0
750         st.f_favail = 0
751
752         st.f_frsize = 0
753         return st
754
755     def _check_writable(self, inode_parent):
756         if not self.enable_write:
757             raise llfuse.FUSEError(errno.EROFS)
758
759         if inode_parent in self.inodes:
760             p = self.inodes[inode_parent]
761         else:
762             raise llfuse.FUSEError(errno.ENOENT)
763
764         if not isinstance(p, Directory):
765             raise llfuse.FUSEError(errno.ENOTDIR)
766
767         if not p.writable():
768             raise llfuse.FUSEError(errno.EPERM)
769
770         return p
771
772     @create_time.time()
773     @catch_exceptions
774     def create(self, inode_parent, name, mode, flags, ctx=None):
775         name = name.decode(encoding=self.inodes.encoding)
776         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
777
778         p = self._check_writable(inode_parent)
779         p.create(name)
780
781         # The file entry should have been implicitly created by callback.
782         f = p[name]
783         fh = next(self._filehandles_counter)
784         self._filehandles[fh] = FileHandle(fh, f)
785         self.inodes.touch(p)
786
787         f.inc_ref()
788         return (fh, self.getattr(f.inode))
789
790     @mkdir_time.time()
791     @catch_exceptions
792     def mkdir(self, inode_parent, name, mode, ctx=None):
793         name = name.decode(encoding=self.inodes.encoding)
794         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
795
796         p = self._check_writable(inode_parent)
797         p.mkdir(name)
798
799         # The dir entry should have been implicitly created by callback.
800         d = p[name]
801
802         d.inc_ref()
803         return self.getattr(d.inode)
804
805     @unlink_time.time()
806     @catch_exceptions
807     def unlink(self, inode_parent, name, ctx=None):
808         name = name.decode(encoding=self.inodes.encoding)
809         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
810         p = self._check_writable(inode_parent)
811         p.unlink(name)
812
813     @rmdir_time.time()
814     @catch_exceptions
815     def rmdir(self, inode_parent, name, ctx=None):
816         name = name.decode(encoding=self.inodes.encoding)
817         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
818         p = self._check_writable(inode_parent)
819         p.rmdir(name)
820
821     @rename_time.time()
822     @catch_exceptions
823     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
824         name_old = name_old.decode(encoding=self.inodes.encoding)
825         name_new = name_new.decode(encoding=self.inodes.encoding)
826         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
827         src = self._check_writable(inode_parent_old)
828         dest = self._check_writable(inode_parent_new)
829         dest.rename(name_old, name_new, src)
830
831     @flush_time.time()
832     @catch_exceptions
833     def flush(self, fh):
834         if fh in self._filehandles:
835             self._filehandles[fh].flush()
836
837     def fsync(self, fh, datasync):
838         self.flush(fh)
839
840     def fsyncdir(self, fh, datasync):
841         self.flush(fh)