15467: Fix SbatchEnvironmentVariables documentation typo
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 """FUSE driver for Arvados Keep
6
7 Architecture:
8
9 There is one `Operations` object per mount point.  It is the entry point for all
10 read and write requests from the llfuse module.
11
12 The operations object owns an `Inodes` object.  The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
15
16 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
17 memory footprint of file system objects and when they are last used.  When the
18 cache limit is exceeded, the least recently used objects are cleared.
19
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
21
22 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
24
25 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`.  These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
30 returing a result.
31
32 The general FUSE operation flow is as follows:
33
34 - The request handler is called with either an inode or file handle that is the
35   subject of the operation.
36
37 - Look up the inode using the Inodes table or the file handle in the
38   filehandles table to get the file system object.
39
40 - For methods that alter files or directories, check that the operation is
41   valid and permitted using _check_writable().
42
43 - Call the relevant method on the file system object.
44
45 - Return the result.
46
47 The FUSE driver supports the Arvados event bus.  When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
49
50 """
51
52 from __future__ import absolute_import
53 from __future__ import division
54 from future.utils import viewitems
55 from future.utils import native
56 from future.utils import listvalues
57 from future.utils import listitems
58 from future import standard_library
59 standard_library.install_aliases()
60 from builtins import next
61 from builtins import str
62 from builtins import object
63 import os
64 import sys
65 import llfuse
66 import errno
67 import stat
68 import threading
69 import arvados
70 import pprint
71 import arvados.events
72 import re
73 import apiclient
74 import json
75 import logging
76 import time
77 import _strptime
78 import calendar
79 import threading
80 import itertools
81 import ciso8601
82 import collections
83 import functools
84 import arvados.keep
85 from prometheus_client import Summary
86 import queue
87
88 # Default _notify_queue has a limit of 1000 items, but it really needs to be
89 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
90 # details.
91
92 if hasattr(llfuse, 'capi'):
93     # llfuse < 0.42
94     llfuse.capi._notify_queue = queue.Queue()
95 else:
96     # llfuse >= 0.42
97     llfuse._notify_queue = queue.Queue()
98
99 LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
100
101 from .fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
102 from .fusefile import StringFile, FuseArvadosFile
103
104 _logger = logging.getLogger('arvados.arvados_fuse')
105
106 # Uncomment this to enable llfuse debug logging.
107 # log_handler = logging.StreamHandler()
108 # llogger = logging.getLogger('llfuse')
109 # llogger.addHandler(log_handler)
110 # llogger.setLevel(logging.DEBUG)
111
112 class Handle(object):
113     """Connects a numeric file handle to a File or Directory object that has
114     been opened by the client."""
115
116     def __init__(self, fh, obj):
117         self.fh = fh
118         self.obj = obj
119         self.obj.inc_use()
120
121     def release(self):
122         self.obj.dec_use()
123
124     def flush(self):
125         pass
126
127
128 class FileHandle(Handle):
129     """Connects a numeric file handle to a File  object that has
130     been opened by the client."""
131
132     def flush(self):
133         if self.obj.writable():
134             return self.obj.flush()
135
136
137 class DirectoryHandle(Handle):
138     """Connects a numeric file handle to a Directory object that has
139     been opened by the client."""
140
141     def __init__(self, fh, dirobj, entries):
142         super(DirectoryHandle, self).__init__(fh, dirobj)
143         self.entries = entries
144
145
146 class InodeCache(object):
147     """Records the memory footprint of objects and when they are last used.
148
149     When the cache limit is exceeded, the least recently used objects are
150     cleared.  Clearing the object means discarding its contents to release
151     memory.  The next time the object is accessed, it must be re-fetched from
152     the server.  Note that the inode cache limit is a soft limit; the cache
153     limit may be exceeded if necessary to load very large objects, it may also
154     be exceeded if open file handles prevent objects from being cleared.
155
156     """
157
158     def __init__(self, cap, min_entries=4):
159         self._entries = collections.OrderedDict()
160         self._by_uuid = {}
161         self.cap = cap
162         self._total = 0
163         self.min_entries = min_entries
164
165     def total(self):
166         return self._total
167
168     def _remove(self, obj, clear):
169         if clear:
170             # Kernel behavior seems to be that if a file is
171             # referenced, its parents remain referenced too. This
172             # means has_ref() exits early when a collection is not
173             # candidate for eviction.
174             #
175             # By contrast, in_use() doesn't increment references on
176             # parents, so it requires a full tree walk to determine if
177             # a collection is a candidate for eviction.  This takes
178             # .07s for 240000 files, which becomes a major drag when
179             # cap_cache is being called several times a second and
180             # there are multiple non-evictable collections in the
181             # cache.
182             #
183             # So it is important for performance that we do the
184             # has_ref() check first.
185
186             if obj.has_ref(True):
187                 _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
188                 return
189
190             if obj.in_use():
191                 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
192                 return
193
194             obj.kernel_invalidate()
195             _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
196             obj.clear()
197
198         # The llfuse lock is released in del_entry(), which is called by
199         # Directory.clear().  While the llfuse lock is released, it can happen
200         # that a reentrant call removes this entry before this call gets to it.
201         # Ensure that the entry is still valid before trying to remove it.
202         if obj.inode not in self._entries:
203             return
204
205         self._total -= obj.cache_size
206         del self._entries[obj.inode]
207         if obj.cache_uuid:
208             self._by_uuid[obj.cache_uuid].remove(obj)
209             if not self._by_uuid[obj.cache_uuid]:
210                 del self._by_uuid[obj.cache_uuid]
211             obj.cache_uuid = None
212         if clear:
213             _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
214
215     def cap_cache(self):
216         if self._total > self.cap:
217             for ent in listvalues(self._entries):
218                 if self._total < self.cap or len(self._entries) < self.min_entries:
219                     break
220                 self._remove(ent, True)
221
222     def manage(self, obj):
223         if obj.persisted():
224             obj.cache_size = obj.objsize()
225             self._entries[obj.inode] = obj
226             obj.cache_uuid = obj.uuid()
227             if obj.cache_uuid:
228                 if obj.cache_uuid not in self._by_uuid:
229                     self._by_uuid[obj.cache_uuid] = [obj]
230                 else:
231                     if obj not in self._by_uuid[obj.cache_uuid]:
232                         self._by_uuid[obj.cache_uuid].append(obj)
233             self._total += obj.objsize()
234             _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
235                           obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
236             self.cap_cache()
237
238     def touch(self, obj):
239         if obj.persisted():
240             if obj.inode in self._entries:
241                 self._remove(obj, False)
242             self.manage(obj)
243
244     def unmanage(self, obj):
245         if obj.persisted() and obj.inode in self._entries:
246             self._remove(obj, True)
247
248     def find_by_uuid(self, uuid):
249         return self._by_uuid.get(uuid, [])
250
251     def clear(self):
252         self._entries.clear()
253         self._by_uuid.clear()
254         self._total = 0
255
256 class Inodes(object):
257     """Manage the set of inodes.  This is the mapping from a numeric id
258     to a concrete File or Directory object"""
259
260     def __init__(self, inode_cache, encoding="utf-8"):
261         self._entries = {}
262         self._counter = itertools.count(llfuse.ROOT_INODE)
263         self.inode_cache = inode_cache
264         self.encoding = encoding
265         self.deferred_invalidations = []
266
267     def __getitem__(self, item):
268         return self._entries[item]
269
270     def __setitem__(self, key, item):
271         self._entries[key] = item
272
273     def __iter__(self):
274         return iter(self._entries.keys())
275
276     def items(self):
277         return viewitems(self._entries.items())
278
279     def __contains__(self, k):
280         return k in self._entries
281
282     def touch(self, entry):
283         entry._atime = time.time()
284         self.inode_cache.touch(entry)
285
286     def add_entry(self, entry):
287         entry.inode = next(self._counter)
288         if entry.inode == llfuse.ROOT_INODE:
289             entry.inc_ref()
290         self._entries[entry.inode] = entry
291         self.inode_cache.manage(entry)
292         return entry
293
294     def del_entry(self, entry):
295         if entry.ref_count == 0:
296             self.inode_cache.unmanage(entry)
297             del self._entries[entry.inode]
298             with llfuse.lock_released:
299                 entry.finalize()
300             entry.inode = None
301         else:
302             entry.dead = True
303             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
304
305     def invalidate_inode(self, entry):
306         if entry.has_ref(False):
307             # Only necessary if the kernel has previously done a lookup on this
308             # inode and hasn't yet forgotten about it.
309             llfuse.invalidate_inode(entry.inode)
310
311     def invalidate_entry(self, entry, name):
312         if entry.has_ref(False):
313             # Only necessary if the kernel has previously done a lookup on this
314             # inode and hasn't yet forgotten about it.
315             llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
316
317     def clear(self):
318         self.inode_cache.clear()
319
320         for k,v in viewitems(self._entries):
321             try:
322                 v.finalize()
323             except Exception as e:
324                 _logger.exception("Error during finalize of inode %i", k)
325
326         self._entries.clear()
327
328
329 def catch_exceptions(orig_func):
330     """Catch uncaught exceptions and log them consistently."""
331
332     @functools.wraps(orig_func)
333     def catch_exceptions_wrapper(self, *args, **kwargs):
334         try:
335             return orig_func(self, *args, **kwargs)
336         except llfuse.FUSEError:
337             raise
338         except EnvironmentError as e:
339             raise llfuse.FUSEError(e.errno)
340         except arvados.errors.KeepWriteError as e:
341             _logger.error("Keep write error: " + str(e))
342             raise llfuse.FUSEError(errno.EIO)
343         except arvados.errors.NotFoundError as e:
344             _logger.error("Block not found error: " + str(e))
345             raise llfuse.FUSEError(errno.EIO)
346         except:
347             _logger.exception("Unhandled exception during FUSE operation")
348             raise llfuse.FUSEError(errno.EIO)
349
350     return catch_exceptions_wrapper
351
352
353 class Operations(llfuse.Operations):
354     """This is the main interface with llfuse.
355
356     The methods on this object are called by llfuse threads to service FUSE
357     events to query and read from the file system.
358
359     llfuse has its own global lock which is acquired before calling a request handler,
360     so request handlers do not run concurrently unless the lock is explicitly released
361     using 'with llfuse.lock_released:'
362
363     """
364
365     fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
366     read_time = fuse_time.labels(op='read')
367     write_time = fuse_time.labels(op='write')
368     destroy_time = fuse_time.labels(op='destroy')
369     on_event_time = fuse_time.labels(op='on_event')
370     getattr_time = fuse_time.labels(op='getattr')
371     setattr_time = fuse_time.labels(op='setattr')
372     lookup_time = fuse_time.labels(op='lookup')
373     forget_time = fuse_time.labels(op='forget')
374     open_time = fuse_time.labels(op='open')
375     release_time = fuse_time.labels(op='release')
376     opendir_time = fuse_time.labels(op='opendir')
377     readdir_time = fuse_time.labels(op='readdir')
378     statfs_time = fuse_time.labels(op='statfs')
379     create_time = fuse_time.labels(op='create')
380     mkdir_time = fuse_time.labels(op='mkdir')
381     unlink_time = fuse_time.labels(op='unlink')
382     rmdir_time = fuse_time.labels(op='rmdir')
383     rename_time = fuse_time.labels(op='rename')
384     flush_time = fuse_time.labels(op='flush')
385
386     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
387         super(Operations, self).__init__()
388
389         self._api_client = api_client
390
391         if not inode_cache:
392             inode_cache = InodeCache(cap=256*1024*1024)
393         self.inodes = Inodes(inode_cache, encoding=encoding)
394         self.uid = uid
395         self.gid = gid
396         self.enable_write = enable_write
397
398         # dict of inode to filehandle
399         self._filehandles = {}
400         self._filehandles_counter = itertools.count(0)
401
402         # Other threads that need to wait until the fuse driver
403         # is fully initialized should wait() on this event object.
404         self.initlock = threading.Event()
405
406         # If we get overlapping shutdown events (e.g., fusermount -u
407         # -z and operations.destroy()) llfuse calls forget() on inodes
408         # that have already been deleted. To avoid this, we make
409         # forget() a no-op if called after destroy().
410         self._shutdown_started = threading.Event()
411
412         self.num_retries = num_retries
413
414         self.read_counter = arvados.keep.Counter()
415         self.write_counter = arvados.keep.Counter()
416         self.read_ops_counter = arvados.keep.Counter()
417         self.write_ops_counter = arvados.keep.Counter()
418
419         self.events = None
420
421     def init(self):
422         # Allow threads that are waiting for the driver to be finished
423         # initializing to continue
424         self.initlock.set()
425
426     def metric_samples(self):
427         return self.fuse_time.collect()[0].samples
428
429     def metric_op_names(self):
430         ops = []
431         for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
432             if cur_op not in ops:
433                 ops.append(cur_op)
434         return ops
435
436     def metric_value(self, opname, metric):
437         op_value = [sample.value for sample in self.metric_samples()
438                     if sample.name == metric and sample.labels['op'] == opname]
439         return op_value[0] if len(op_value) == 1 else None
440
441     def metric_sum_func(self, opname):
442         return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
443
444     def metric_count_func(self, opname):
445         return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
446
447     @destroy_time.time()
448     @catch_exceptions
449     def destroy(self):
450         self._shutdown_started.set()
451         if self.events:
452             self.events.close()
453             self.events = None
454
455         # Different versions of llfuse require and forbid us to
456         # acquire the lock here. See #8345#note-37, #10805#note-9.
457         if LLFUSE_VERSION_0 and llfuse.lock.acquire():
458             # llfuse < 0.42
459             self.inodes.clear()
460             llfuse.lock.release()
461         else:
462             # llfuse >= 0.42
463             self.inodes.clear()
464
465     def access(self, inode, mode, ctx):
466         return True
467
468     def listen_for_events(self):
469         self.events = arvados.events.subscribe(
470             self._api_client,
471             [["event_type", "in", ["create", "update", "delete"]]],
472             self.on_event)
473
474     @on_event_time.time()
475     @catch_exceptions
476     def on_event(self, ev):
477         if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
478             return
479         with llfuse.lock:
480             properties = ev.get("properties") or {}
481             old_attrs = properties.get("old_attributes") or {}
482             new_attrs = properties.get("new_attributes") or {}
483
484             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
485                 item.invalidate()
486                 if ev.get("object_kind") == "arvados#collection":
487                     pdh = new_attrs.get("portable_data_hash")
488                     # new_attributes.modified_at currently lacks
489                     # subsecond precision (see #6347) so use event_at
490                     # which should always be the same.
491                     stamp = ev.get("event_at")
492                     if (stamp and pdh and item.writable() and
493                         item.collection is not None and
494                         item.collection.modified() and
495                         new_attrs.get("is_trashed") is not True):
496                         item.update(to_record_version=(stamp, pdh))
497
498             oldowner = old_attrs.get("owner_uuid")
499             newowner = ev.get("object_owner_uuid")
500             for parent in (
501                     self.inodes.inode_cache.find_by_uuid(oldowner) +
502                     self.inodes.inode_cache.find_by_uuid(newowner)):
503                 parent.child_event(ev)
504
505     @getattr_time.time()
506     @catch_exceptions
507     def getattr(self, inode, ctx=None):
508         if inode not in self.inodes:
509             raise llfuse.FUSEError(errno.ENOENT)
510
511         e = self.inodes[inode]
512
513         entry = llfuse.EntryAttributes()
514         entry.st_ino = inode
515         entry.generation = 0
516         entry.entry_timeout = 0
517         entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
518
519         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
520         if isinstance(e, Directory):
521             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
522         else:
523             entry.st_mode |= stat.S_IFREG
524             if isinstance(e, FuseArvadosFile):
525                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
526
527         if self.enable_write and e.writable():
528             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
529
530         entry.st_nlink = 1
531         entry.st_uid = self.uid
532         entry.st_gid = self.gid
533         entry.st_rdev = 0
534
535         entry.st_size = e.size()
536
537         entry.st_blksize = 512
538         entry.st_blocks = (entry.st_size // 512) + 1
539         if hasattr(entry, 'st_atime_ns'):
540             # llfuse >= 0.42
541             entry.st_atime_ns = int(e.atime() * 1000000000)
542             entry.st_mtime_ns = int(e.mtime() * 1000000000)
543             entry.st_ctime_ns = int(e.mtime() * 1000000000)
544         else:
545             # llfuse < 0.42
546             entry.st_atime = int(e.atime)
547             entry.st_mtime = int(e.mtime)
548             entry.st_ctime = int(e.mtime)
549
550         return entry
551
552     @setattr_time.time()
553     @catch_exceptions
554     def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
555         entry = self.getattr(inode)
556
557         if fh is not None and fh in self._filehandles:
558             handle = self._filehandles[fh]
559             e = handle.obj
560         else:
561             e = self.inodes[inode]
562
563         if fields is None:
564             # llfuse < 0.42
565             update_size = attr.st_size is not None
566         else:
567             # llfuse >= 0.42
568             update_size = fields.update_size
569         if update_size and isinstance(e, FuseArvadosFile):
570             with llfuse.lock_released:
571                 e.arvfile.truncate(attr.st_size)
572                 entry.st_size = e.arvfile.size()
573
574         return entry
575
576     @lookup_time.time()
577     @catch_exceptions
578     def lookup(self, parent_inode, name, ctx=None):
579         name = str(name, self.inodes.encoding)
580         inode = None
581
582         if name == '.':
583             inode = parent_inode
584         else:
585             if parent_inode in self.inodes:
586                 p = self.inodes[parent_inode]
587                 self.inodes.touch(p)
588                 if name == '..':
589                     inode = p.parent_inode
590                 elif isinstance(p, Directory) and name in p:
591                     inode = p[name].inode
592
593         if inode != None:
594             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
595                       parent_inode, name, inode)
596             self.inodes[inode].inc_ref()
597             return self.getattr(inode)
598         else:
599             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
600                       parent_inode, name)
601             raise llfuse.FUSEError(errno.ENOENT)
602
603     @forget_time.time()
604     @catch_exceptions
605     def forget(self, inodes):
606         if self._shutdown_started.is_set():
607             return
608         for inode, nlookup in inodes:
609             ent = self.inodes[inode]
610             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
611             if ent.dec_ref(nlookup) == 0 and ent.dead:
612                 self.inodes.del_entry(ent)
613
614     @open_time.time()
615     @catch_exceptions
616     def open(self, inode, flags, ctx=None):
617         if inode in self.inodes:
618             p = self.inodes[inode]
619         else:
620             raise llfuse.FUSEError(errno.ENOENT)
621
622         if isinstance(p, Directory):
623             raise llfuse.FUSEError(errno.EISDIR)
624
625         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
626             raise llfuse.FUSEError(errno.EPERM)
627
628         fh = next(self._filehandles_counter)
629         self._filehandles[fh] = FileHandle(fh, p)
630         self.inodes.touch(p)
631
632         # Normally, we will have received an "update" event if the
633         # parent collection is stale here. However, even if the parent
634         # collection hasn't changed, the manifest might have been
635         # fetched so long ago that the signatures on the data block
636         # locators have expired. Calling checkupdate() on all
637         # ancestors ensures the signatures will be refreshed if
638         # necessary.
639         while p.parent_inode in self.inodes:
640             if p == self.inodes[p.parent_inode]:
641                 break
642             p = self.inodes[p.parent_inode]
643             self.inodes.touch(p)
644             p.checkupdate()
645
646         _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
647
648         return fh
649
650     @read_time.time()
651     @catch_exceptions
652     def read(self, fh, off, size):
653         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
654         self.read_ops_counter.add(1)
655
656         if fh in self._filehandles:
657             handle = self._filehandles[fh]
658         else:
659             raise llfuse.FUSEError(errno.EBADF)
660
661         self.inodes.touch(handle.obj)
662
663         r = handle.obj.readfrom(off, size, self.num_retries)
664         if r:
665             self.read_counter.add(len(r))
666         return r
667
668     @write_time.time()
669     @catch_exceptions
670     def write(self, fh, off, buf):
671         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
672         self.write_ops_counter.add(1)
673
674         if fh in self._filehandles:
675             handle = self._filehandles[fh]
676         else:
677             raise llfuse.FUSEError(errno.EBADF)
678
679         if not handle.obj.writable():
680             raise llfuse.FUSEError(errno.EPERM)
681
682         self.inodes.touch(handle.obj)
683
684         w = handle.obj.writeto(off, buf, self.num_retries)
685         if w:
686             self.write_counter.add(w)
687         return w
688
689     @release_time.time()
690     @catch_exceptions
691     def release(self, fh):
692         if fh in self._filehandles:
693             _logger.debug("arv-mount release fh %i", fh)
694             try:
695                 self._filehandles[fh].flush()
696             except Exception:
697                 raise
698             finally:
699                 self._filehandles[fh].release()
700                 del self._filehandles[fh]
701         self.inodes.inode_cache.cap_cache()
702
703     def releasedir(self, fh):
704         self.release(fh)
705
706     @opendir_time.time()
707     @catch_exceptions
708     def opendir(self, inode, ctx=None):
709         _logger.debug("arv-mount opendir: inode %i", inode)
710
711         if inode in self.inodes:
712             p = self.inodes[inode]
713         else:
714             raise llfuse.FUSEError(errno.ENOENT)
715
716         if not isinstance(p, Directory):
717             raise llfuse.FUSEError(errno.ENOTDIR)
718
719         fh = next(self._filehandles_counter)
720         if p.parent_inode in self.inodes:
721             parent = self.inodes[p.parent_inode]
722         else:
723             raise llfuse.FUSEError(errno.EIO)
724
725         # update atime
726         self.inodes.touch(p)
727         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p))
728         return fh
729
730     @readdir_time.time()
731     @catch_exceptions
732     def readdir(self, fh, off):
733         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
734
735         if fh in self._filehandles:
736             handle = self._filehandles[fh]
737         else:
738             raise llfuse.FUSEError(errno.EBADF)
739
740         e = off
741         while e < len(handle.entries):
742             if handle.entries[e][1].inode in self.inodes:
743                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
744             e += 1
745
746     @statfs_time.time()
747     @catch_exceptions
748     def statfs(self, ctx=None):
749         st = llfuse.StatvfsData()
750         st.f_bsize = 128 * 1024
751         st.f_blocks = 0
752         st.f_files = 0
753
754         st.f_bfree = 0
755         st.f_bavail = 0
756
757         st.f_ffree = 0
758         st.f_favail = 0
759
760         st.f_frsize = 0
761         return st
762
763     def _check_writable(self, inode_parent):
764         if not self.enable_write:
765             raise llfuse.FUSEError(errno.EROFS)
766
767         if inode_parent in self.inodes:
768             p = self.inodes[inode_parent]
769         else:
770             raise llfuse.FUSEError(errno.ENOENT)
771
772         if not isinstance(p, Directory):
773             raise llfuse.FUSEError(errno.ENOTDIR)
774
775         if not p.writable():
776             raise llfuse.FUSEError(errno.EPERM)
777
778         return p
779
780     @create_time.time()
781     @catch_exceptions
782     def create(self, inode_parent, name, mode, flags, ctx=None):
783         name = name.decode()
784         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
785
786         p = self._check_writable(inode_parent)
787         p.create(name)
788
789         # The file entry should have been implicitly created by callback.
790         f = p[name]
791         fh = next(self._filehandles_counter)
792         self._filehandles[fh] = FileHandle(fh, f)
793         self.inodes.touch(p)
794
795         f.inc_ref()
796         return (fh, self.getattr(f.inode))
797
798     @mkdir_time.time()
799     @catch_exceptions
800     def mkdir(self, inode_parent, name, mode, ctx=None):
801         name = name.decode()
802         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
803
804         p = self._check_writable(inode_parent)
805         p.mkdir(name)
806
807         # The dir entry should have been implicitly created by callback.
808         d = p[name]
809
810         d.inc_ref()
811         return self.getattr(d.inode)
812
813     @unlink_time.time()
814     @catch_exceptions
815     def unlink(self, inode_parent, name, ctx=None):
816         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
817         p = self._check_writable(inode_parent)
818         p.unlink(name.decode())
819
820     @rmdir_time.time()
821     @catch_exceptions
822     def rmdir(self, inode_parent, name, ctx=None):
823         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
824         p = self._check_writable(inode_parent)
825         p.rmdir(name.decode())
826
827     @rename_time.time()
828     @catch_exceptions
829     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
830         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
831         src = self._check_writable(inode_parent_old)
832         dest = self._check_writable(inode_parent_new)
833         dest.rename(name_old.decode(), name_new.decode(), src)
834
835     @flush_time.time()
836     @catch_exceptions
837     def flush(self, fh):
838         if fh in self._filehandles:
839             self._filehandles[fh].flush()
840
841     def fsync(self, fh, datasync):
842         self.flush(fh)
843
844     def fsyncdir(self, fh, datasync):
845         self.flush(fh)