21541: Manage memory related to project directories
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 """FUSE driver for Arvados Keep
6
7 Architecture:
8
9 There is one `Operations` object per mount point.  It is the entry point for all
10 read and write requests from the llfuse module.
11
12 The operations object owns an `Inodes` object.  The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
15
16 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
17 memory footprint of file system objects and when they are last used.  When the
18 cache limit is exceeded, the least recently used objects are cleared.
19
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
21
22 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
24
25 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`.  These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
30 returing a result.
31
32 The general FUSE operation flow is as follows:
33
34 - The request handler is called with either an inode or file handle that is the
35   subject of the operation.
36
37 - Look up the inode using the Inodes table or the file handle in the
38   filehandles table to get the file system object.
39
40 - For methods that alter files or directories, check that the operation is
41   valid and permitted using _check_writable().
42
43 - Call the relevant method on the file system object.
44
45 - Return the result.
46
47 The FUSE driver supports the Arvados event bus.  When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
49
50 """
51
52 from __future__ import absolute_import
53 from __future__ import division
54 from future.utils import viewitems
55 from future.utils import native
56 from future.utils import listvalues
57 from future.utils import listitems
58 from future import standard_library
59 standard_library.install_aliases()
60 from builtins import next
61 from builtins import str
62 from builtins import object
63 import os
64 import llfuse
65 import errno
66 import stat
67 import threading
68 import arvados
69 import arvados.events
70 import logging
71 import time
72 import threading
73 import itertools
74 import collections
75 import functools
76 import arvados.keep
77 from prometheus_client import Summary
78 import queue
79
80 # Default _notify_queue has a limit of 1000 items, but it really needs to be
81 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
82 # details.
83
84 if hasattr(llfuse, 'capi'):
85     # llfuse < 0.42
86     llfuse.capi._notify_queue = queue.Queue()
87 else:
88     # llfuse >= 0.42
89     llfuse._notify_queue = queue.Queue()
90
91 LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
92
93 from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
94 from .fusefile import StringFile, FuseArvadosFile
95
96 _logger = logging.getLogger('arvados.arvados_fuse')
97
98 # Uncomment this to enable llfuse debug logging.
99 # log_handler = logging.StreamHandler()
100 # llogger = logging.getLogger('llfuse')
101 # llogger.addHandler(log_handler)
102 # llogger.setLevel(logging.DEBUG)
103
104 class Handle(object):
105     """Connects a numeric file handle to a File or Directory object that has
106     been opened by the client."""
107
108     def __init__(self, fh, obj):
109         self.fh = fh
110         self.obj = obj
111         self.obj.inc_use()
112
113     def release(self):
114         self.obj.dec_use()
115
116     def flush(self):
117         pass
118
119
120 class FileHandle(Handle):
121     """Connects a numeric file handle to a File  object that has
122     been opened by the client."""
123
124     def flush(self):
125         if self.obj.writable():
126             return self.obj.flush()
127
128
129 class DirectoryHandle(Handle):
130     """Connects a numeric file handle to a Directory object that has
131     been opened by the client."""
132
133     def __init__(self, fh, dirobj, entries):
134         super(DirectoryHandle, self).__init__(fh, dirobj)
135         self.entries = entries
136         for ent in self.entries:
137             ent[1].inc_use()
138
139     def release(self):
140         for ent in self.entries:
141             ent[1].dec_use()
142         super(DirectoryHandle, self).release()
143
144
145 class InodeCache(object):
146     """Records the memory footprint of objects and when they are last used.
147
148     When the cache limit is exceeded, the least recently used objects are
149     cleared.  Clearing the object means discarding its contents to release
150     memory.  The next time the object is accessed, it must be re-fetched from
151     the server.  Note that the inode cache limit is a soft limit; the cache
152     limit may be exceeded if necessary to load very large objects, it may also
153     be exceeded if open file handles prevent objects from being cleared.
154
155     """
156
157     def __init__(self, cap, min_entries=4):
158         self._entries = collections.OrderedDict()
159         self._by_uuid = {}
160         self.cap = cap
161         self._total = 0
162         self.min_entries = min_entries
163
164     def total(self):
165         return self._total
166
167     def _remove(self, obj, clear):
168         if obj.inode is None:
169             return
170         if clear:
171             # Kernel behavior seems to be that if a file is
172             # referenced, its parents remain referenced too. This
173             # means has_ref() exits early when a collection is not
174             # candidate for eviction.
175             #
176             # By contrast, in_use() doesn't increment references on
177             # parents, so it requires a full tree walk to determine if
178             # a collection is a candidate for eviction.  This takes
179             # .07s for 240000 files, which becomes a major drag when
180             # cap_cache is being called several times a second and
181             # there are multiple non-evictable collections in the
182             # cache.
183             #
184             # So it is important for performance that we do the
185             # has_ref() check first.
186
187             if obj.has_ref(True):
188                 #_logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
189                 return
190
191             if obj.in_use():
192                 #_logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
193                 return
194
195             obj.kernel_invalidate()
196             _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
197             obj.clear()
198
199         # The llfuse lock is released in del_entry(), which is called by
200         # Directory.clear().  While the llfuse lock is released, it can happen
201         # that a reentrant call removes this entry before this call gets to it.
202         # Ensure that the entry is still valid before trying to remove it.
203         if obj.inode not in self._entries:
204             return
205
206         self._total -= obj.cache_size
207         del self._entries[obj.inode]
208         if obj.cache_uuid:
209             self._by_uuid[obj.cache_uuid].remove(obj)
210             if not self._by_uuid[obj.cache_uuid]:
211                 del self._by_uuid[obj.cache_uuid]
212             obj.cache_uuid = None
213         if clear:
214             _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
215
216     def cap_cache(self):
217         _logger.debug("in cap_cache %i, %i", self._total, self.cap)
218         if self._total > self.cap:
219             for ent in listvalues(self._entries):
220                 if self._total < self.cap or len(self._entries) < self.min_entries:
221                     break
222                 self._remove(ent, True)
223             _logger.debug("end cap_cache %i, %i", self._total, self.cap)
224
225     def manage(self, obj):
226         if obj.persisted():
227             obj.cache_size = obj.objsize()
228             self._entries[obj.inode] = obj
229             obj.cache_uuid = obj.uuid()
230             if obj.cache_uuid:
231                 if obj.cache_uuid not in self._by_uuid:
232                     self._by_uuid[obj.cache_uuid] = [obj]
233                 else:
234                     if obj not in self._by_uuid[obj.cache_uuid]:
235                         self._by_uuid[obj.cache_uuid].append(obj)
236             self._total += obj.cache_size
237             _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
238                           obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
239
240     def update_cache_size(self, obj):
241         if obj.inode in self._entries:
242             self._total -= obj.cache_size
243             obj.cache_size = obj.objsize()
244             self._total += obj.cache_size
245
246     def touch(self, obj):
247         if obj.persisted():
248             if obj.inode in self._entries:
249                 self._entries.move_to_end(obj.inode)
250             else:
251                 self.manage(obj)
252
253     def unmanage(self, obj):
254         if obj.persisted() and obj.inode in self._entries:
255             self._remove(obj, True)
256
257     def find_by_uuid(self, uuid):
258         return self._by_uuid.get(uuid, [])
259
260     def clear(self):
261         self._entries.clear()
262         self._by_uuid.clear()
263         self._total = 0
264
265 class Inodes(object):
266     """Manage the set of inodes.  This is the mapping from a numeric id
267     to a concrete File or Directory object"""
268
269     def __init__(self, inode_cache, encoding="utf-8"):
270         self._entries = {}
271         self._counter = itertools.count(llfuse.ROOT_INODE)
272         self.inode_cache = inode_cache
273         self.encoding = encoding
274         self.deferred_invalidations = []
275
276     def __getitem__(self, item):
277         return self._entries[item]
278
279     def __setitem__(self, key, item):
280         self._entries[key] = item
281
282     def __iter__(self):
283         return iter(self._entries.keys())
284
285     def items(self):
286         return viewitems(self._entries.items())
287
288     def __contains__(self, k):
289         return k in self._entries
290
291     def touch(self, entry):
292         entry._atime = time.time()
293         self.inode_cache.touch(entry)
294
295     def add_entry(self, entry):
296         entry.inode = next(self._counter)
297         if entry.inode == llfuse.ROOT_INODE:
298             entry.inc_ref()
299         self._entries[entry.inode] = entry
300         self.inode_cache.manage(entry)
301         return entry
302
303     def del_entry(self, entry):
304         if entry.ref_count == 0:
305             self.inode_cache.unmanage(entry)
306             del self._entries[entry.inode]
307             with llfuse.lock_released:
308                 entry.finalize()
309             entry.inode = None
310         else:
311             entry.dead = True
312             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
313
314     def invalidate_inode(self, entry):
315         if entry.has_ref(False):
316             # Only necessary if the kernel has previously done a lookup on this
317             # inode and hasn't yet forgotten about it.
318             llfuse.invalidate_inode(entry.inode)
319
320     def invalidate_entry(self, entry, name):
321         if entry.has_ref(False):
322             # Only necessary if the kernel has previously done a lookup on this
323             # inode and hasn't yet forgotten about it.
324             llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
325
326     def clear(self):
327         self.inode_cache.clear()
328
329         for k,v in viewitems(self._entries):
330             try:
331                 v.finalize()
332             except Exception as e:
333                 _logger.exception("Error during finalize of inode %i", k)
334
335         self._entries.clear()
336
337
338 def catch_exceptions(orig_func):
339     """Catch uncaught exceptions and log them consistently."""
340
341     @functools.wraps(orig_func)
342     def catch_exceptions_wrapper(self, *args, **kwargs):
343         try:
344             return orig_func(self, *args, **kwargs)
345         except llfuse.FUSEError:
346             raise
347         except EnvironmentError as e:
348             raise llfuse.FUSEError(e.errno)
349         except NotImplementedError:
350             raise llfuse.FUSEError(errno.ENOTSUP)
351         except arvados.errors.KeepWriteError as e:
352             _logger.error("Keep write error: " + str(e))
353             raise llfuse.FUSEError(errno.EIO)
354         except arvados.errors.NotFoundError as e:
355             _logger.error("Block not found error: " + str(e))
356             raise llfuse.FUSEError(errno.EIO)
357         except:
358             _logger.exception("Unhandled exception during FUSE operation")
359             raise llfuse.FUSEError(errno.EIO)
360
361     return catch_exceptions_wrapper
362
363
364 class Operations(llfuse.Operations):
365     """This is the main interface with llfuse.
366
367     The methods on this object are called by llfuse threads to service FUSE
368     events to query and read from the file system.
369
370     llfuse has its own global lock which is acquired before calling a request handler,
371     so request handlers do not run concurrently unless the lock is explicitly released
372     using 'with llfuse.lock_released:'
373
374     """
375
376     fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
377     read_time = fuse_time.labels(op='read')
378     write_time = fuse_time.labels(op='write')
379     destroy_time = fuse_time.labels(op='destroy')
380     on_event_time = fuse_time.labels(op='on_event')
381     getattr_time = fuse_time.labels(op='getattr')
382     setattr_time = fuse_time.labels(op='setattr')
383     lookup_time = fuse_time.labels(op='lookup')
384     forget_time = fuse_time.labels(op='forget')
385     open_time = fuse_time.labels(op='open')
386     release_time = fuse_time.labels(op='release')
387     opendir_time = fuse_time.labels(op='opendir')
388     readdir_time = fuse_time.labels(op='readdir')
389     statfs_time = fuse_time.labels(op='statfs')
390     create_time = fuse_time.labels(op='create')
391     mkdir_time = fuse_time.labels(op='mkdir')
392     unlink_time = fuse_time.labels(op='unlink')
393     rmdir_time = fuse_time.labels(op='rmdir')
394     rename_time = fuse_time.labels(op='rename')
395     flush_time = fuse_time.labels(op='flush')
396
397     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
398         super(Operations, self).__init__()
399
400         self._api_client = api_client
401
402         if not inode_cache:
403             inode_cache = InodeCache(cap=256*1024*1024)
404         self.inodes = Inodes(inode_cache, encoding=encoding)
405         self.uid = uid
406         self.gid = gid
407         self.enable_write = enable_write
408
409         # dict of inode to filehandle
410         self._filehandles = {}
411         self._filehandles_counter = itertools.count(0)
412
413         # Other threads that need to wait until the fuse driver
414         # is fully initialized should wait() on this event object.
415         self.initlock = threading.Event()
416
417         # If we get overlapping shutdown events (e.g., fusermount -u
418         # -z and operations.destroy()) llfuse calls forget() on inodes
419         # that have already been deleted. To avoid this, we make
420         # forget() a no-op if called after destroy().
421         self._shutdown_started = threading.Event()
422
423         self.num_retries = num_retries
424
425         self.read_counter = arvados.keep.Counter()
426         self.write_counter = arvados.keep.Counter()
427         self.read_ops_counter = arvados.keep.Counter()
428         self.write_ops_counter = arvados.keep.Counter()
429
430         self.events = None
431
432     def init(self):
433         # Allow threads that are waiting for the driver to be finished
434         # initializing to continue
435         self.initlock.set()
436
437     def metric_samples(self):
438         return self.fuse_time.collect()[0].samples
439
440     def metric_op_names(self):
441         ops = []
442         for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
443             if cur_op not in ops:
444                 ops.append(cur_op)
445         return ops
446
447     def metric_value(self, opname, metric):
448         op_value = [sample.value for sample in self.metric_samples()
449                     if sample.name == metric and sample.labels['op'] == opname]
450         return op_value[0] if len(op_value) == 1 else None
451
452     def metric_sum_func(self, opname):
453         return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
454
455     def metric_count_func(self, opname):
456         return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
457
458     @destroy_time.time()
459     @catch_exceptions
460     def destroy(self):
461         self._shutdown_started.set()
462         if self.events:
463             self.events.close()
464             self.events = None
465
466         # Different versions of llfuse require and forbid us to
467         # acquire the lock here. See #8345#note-37, #10805#note-9.
468         if LLFUSE_VERSION_0 and llfuse.lock.acquire():
469             # llfuse < 0.42
470             self.inodes.clear()
471             llfuse.lock.release()
472         else:
473             # llfuse >= 0.42
474             self.inodes.clear()
475
476     def access(self, inode, mode, ctx):
477         return True
478
479     def listen_for_events(self):
480         self.events = arvados.events.subscribe(
481             self._api_client,
482             [["event_type", "in", ["create", "update", "delete"]]],
483             self.on_event)
484
485     @on_event_time.time()
486     @catch_exceptions
487     def on_event(self, ev):
488         if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
489             return
490         with llfuse.lock:
491             properties = ev.get("properties") or {}
492             old_attrs = properties.get("old_attributes") or {}
493             new_attrs = properties.get("new_attributes") or {}
494
495             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
496                 item.invalidate()
497
498             oldowner = old_attrs.get("owner_uuid")
499             newowner = ev.get("object_owner_uuid")
500             for parent in (
501                     self.inodes.inode_cache.find_by_uuid(oldowner) +
502                     self.inodes.inode_cache.find_by_uuid(newowner)):
503                 parent.invalidate()
504
505     @getattr_time.time()
506     @catch_exceptions
507     def getattr(self, inode, ctx=None):
508         if inode not in self.inodes:
509             raise llfuse.FUSEError(errno.ENOENT)
510
511         e = self.inodes[inode]
512
513         entry = llfuse.EntryAttributes()
514         entry.st_ino = inode
515         entry.generation = 0
516         entry.entry_timeout = 0
517         entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
518
519         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
520         if isinstance(e, Directory):
521             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
522         else:
523             entry.st_mode |= stat.S_IFREG
524             if isinstance(e, FuseArvadosFile):
525                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
526
527         if self.enable_write and e.writable():
528             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
529
530         entry.st_nlink = 1
531         entry.st_uid = self.uid
532         entry.st_gid = self.gid
533         entry.st_rdev = 0
534
535         entry.st_size = e.size()
536
537         entry.st_blksize = 512
538         entry.st_blocks = (entry.st_size // 512) + 1
539         if hasattr(entry, 'st_atime_ns'):
540             # llfuse >= 0.42
541             entry.st_atime_ns = int(e.atime() * 1000000000)
542             entry.st_mtime_ns = int(e.mtime() * 1000000000)
543             entry.st_ctime_ns = int(e.mtime() * 1000000000)
544         else:
545             # llfuse < 0.42
546             entry.st_atime = int(e.atime)
547             entry.st_mtime = int(e.mtime)
548             entry.st_ctime = int(e.mtime)
549
550         return entry
551
552     @setattr_time.time()
553     @catch_exceptions
554     def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
555         entry = self.getattr(inode)
556
557         if fh is not None and fh in self._filehandles:
558             handle = self._filehandles[fh]
559             e = handle.obj
560         else:
561             e = self.inodes[inode]
562
563         if fields is None:
564             # llfuse < 0.42
565             update_size = attr.st_size is not None
566         else:
567             # llfuse >= 0.42
568             update_size = fields.update_size
569         if update_size and isinstance(e, FuseArvadosFile):
570             with llfuse.lock_released:
571                 e.arvfile.truncate(attr.st_size)
572                 entry.st_size = e.arvfile.size()
573
574         return entry
575
576     @lookup_time.time()
577     @catch_exceptions
578     def lookup(self, parent_inode, name, ctx=None):
579         name = str(name, self.inodes.encoding)
580         inode = None
581
582         if name == '.':
583             inode = parent_inode
584         elif parent_inode in self.inodes:
585             p = self.inodes[parent_inode]
586             self.inodes.touch(p)
587             if name == '..':
588                 inode = p.parent_inode
589             elif isinstance(p, Directory) and name in p:
590                 inode = p[name].inode
591
592         if inode != None:
593             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
594                       parent_inode, name, inode)
595             self.inodes[inode].inc_ref()
596             return self.getattr(inode)
597         else:
598             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
599                       parent_inode, name)
600             raise llfuse.FUSEError(errno.ENOENT)
601
602     @forget_time.time()
603     @catch_exceptions
604     def forget(self, inodes):
605         if self._shutdown_started.is_set():
606             return
607         for inode, nlookup in inodes:
608             ent = self.inodes[inode]
609             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
610             if ent.dec_ref(nlookup) == 0 and ent.dead:
611                 self.inodes.del_entry(ent)
612
613     @open_time.time()
614     @catch_exceptions
615     def open(self, inode, flags, ctx=None):
616         if inode in self.inodes:
617             p = self.inodes[inode]
618         else:
619             raise llfuse.FUSEError(errno.ENOENT)
620
621         if isinstance(p, Directory):
622             raise llfuse.FUSEError(errno.EISDIR)
623
624         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
625             raise llfuse.FUSEError(errno.EPERM)
626
627         fh = next(self._filehandles_counter)
628         self._filehandles[fh] = FileHandle(fh, p)
629         self.inodes.touch(p)
630
631         # Normally, we will have received an "update" event if the
632         # parent collection is stale here. However, even if the parent
633         # collection hasn't changed, the manifest might have been
634         # fetched so long ago that the signatures on the data block
635         # locators have expired. Calling checkupdate() on all
636         # ancestors ensures the signatures will be refreshed if
637         # necessary.
638         while p.parent_inode in self.inodes:
639             if p == self.inodes[p.parent_inode]:
640                 break
641             p = self.inodes[p.parent_inode]
642             self.inodes.touch(p)
643             p.checkupdate()
644
645         _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
646
647         return fh
648
649     @read_time.time()
650     @catch_exceptions
651     def read(self, fh, off, size):
652         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
653         self.read_ops_counter.add(1)
654
655         if fh in self._filehandles:
656             handle = self._filehandles[fh]
657         else:
658             raise llfuse.FUSEError(errno.EBADF)
659
660         self.inodes.touch(handle.obj)
661
662         r = handle.obj.readfrom(off, size, self.num_retries)
663         if r:
664             self.read_counter.add(len(r))
665         return r
666
667     @write_time.time()
668     @catch_exceptions
669     def write(self, fh, off, buf):
670         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
671         self.write_ops_counter.add(1)
672
673         if fh in self._filehandles:
674             handle = self._filehandles[fh]
675         else:
676             raise llfuse.FUSEError(errno.EBADF)
677
678         if not handle.obj.writable():
679             raise llfuse.FUSEError(errno.EPERM)
680
681         self.inodes.touch(handle.obj)
682
683         w = handle.obj.writeto(off, buf, self.num_retries)
684         if w:
685             self.write_counter.add(w)
686         return w
687
688     @release_time.time()
689     @catch_exceptions
690     def release(self, fh):
691         if fh in self._filehandles:
692             _logger.debug("arv-mount release fh %i", fh)
693             try:
694                 self._filehandles[fh].flush()
695             except Exception:
696                 raise
697             finally:
698                 self._filehandles[fh].release()
699                 del self._filehandles[fh]
700         self.inodes.inode_cache.cap_cache()
701
702     def releasedir(self, fh):
703         self.release(fh)
704
705     @opendir_time.time()
706     @catch_exceptions
707     def opendir(self, inode, ctx=None):
708         _logger.debug("arv-mount opendir: inode %i", inode)
709
710         if inode in self.inodes:
711             p = self.inodes[inode]
712         else:
713             raise llfuse.FUSEError(errno.ENOENT)
714
715         if not isinstance(p, Directory):
716             raise llfuse.FUSEError(errno.ENOTDIR)
717
718         fh = next(self._filehandles_counter)
719         if p.parent_inode in self.inodes:
720             parent = self.inodes[p.parent_inode]
721         else:
722             _logger.warning("arv-mount opendir: parent inode %i of %i is missing", p.parent_inode, inode)
723             raise llfuse.FUSEError(errno.EIO)
724
725         _logger.debug("arv-mount opendir: inode %i fh %i ", inode, fh)
726
727         # update atime
728         self.inodes.touch(p)
729         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p))
730         return fh
731
732     @readdir_time.time()
733     @catch_exceptions
734     def readdir(self, fh, off):
735         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
736
737         if fh in self._filehandles:
738             handle = self._filehandles[fh]
739         else:
740             raise llfuse.FUSEError(errno.EBADF)
741
742         e = off
743         while e < len(handle.entries):
744             ent = handle.entries[e]
745             if ent[1].inode in self.inodes:
746                 yield (ent[0].encode(self.inodes.encoding), self.getattr(ent[1].inode), e+1)
747             e += 1
748
749     @statfs_time.time()
750     @catch_exceptions
751     def statfs(self, ctx=None):
752         st = llfuse.StatvfsData()
753         st.f_bsize = 128 * 1024
754         st.f_blocks = 0
755         st.f_files = 0
756
757         st.f_bfree = 0
758         st.f_bavail = 0
759
760         st.f_ffree = 0
761         st.f_favail = 0
762
763         st.f_frsize = 0
764         return st
765
766     def _check_writable(self, inode_parent):
767         if not self.enable_write:
768             raise llfuse.FUSEError(errno.EROFS)
769
770         if inode_parent in self.inodes:
771             p = self.inodes[inode_parent]
772         else:
773             raise llfuse.FUSEError(errno.ENOENT)
774
775         if not isinstance(p, Directory):
776             raise llfuse.FUSEError(errno.ENOTDIR)
777
778         if not p.writable():
779             raise llfuse.FUSEError(errno.EPERM)
780
781         return p
782
783     @create_time.time()
784     @catch_exceptions
785     def create(self, inode_parent, name, mode, flags, ctx=None):
786         name = name.decode(encoding=self.inodes.encoding)
787         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
788
789         p = self._check_writable(inode_parent)
790         p.create(name)
791
792         # The file entry should have been implicitly created by callback.
793         f = p[name]
794         fh = next(self._filehandles_counter)
795         self._filehandles[fh] = FileHandle(fh, f)
796         self.inodes.touch(p)
797
798         f.inc_ref()
799         return (fh, self.getattr(f.inode))
800
801     @mkdir_time.time()
802     @catch_exceptions
803     def mkdir(self, inode_parent, name, mode, ctx=None):
804         name = name.decode(encoding=self.inodes.encoding)
805         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
806
807         p = self._check_writable(inode_parent)
808         p.mkdir(name)
809
810         # The dir entry should have been implicitly created by callback.
811         d = p[name]
812
813         d.inc_ref()
814         return self.getattr(d.inode)
815
816     @unlink_time.time()
817     @catch_exceptions
818     def unlink(self, inode_parent, name, ctx=None):
819         name = name.decode(encoding=self.inodes.encoding)
820         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
821         p = self._check_writable(inode_parent)
822         p.unlink(name)
823
824     @rmdir_time.time()
825     @catch_exceptions
826     def rmdir(self, inode_parent, name, ctx=None):
827         name = name.decode(encoding=self.inodes.encoding)
828         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
829         p = self._check_writable(inode_parent)
830         p.rmdir(name)
831
832     @rename_time.time()
833     @catch_exceptions
834     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
835         name_old = name_old.decode(encoding=self.inodes.encoding)
836         name_new = name_new.decode(encoding=self.inodes.encoding)
837         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
838         src = self._check_writable(inode_parent_old)
839         dest = self._check_writable(inode_parent_new)
840         dest.rename(name_old, name_new, src)
841
842     @flush_time.time()
843     @catch_exceptions
844     def flush(self, fh):
845         if fh in self._filehandles:
846             self._filehandles[fh].flush()
847
848     def fsync(self, fh, datasync):
849         self.flush(fh)
850
851     def fsyncdir(self, fh, datasync):
852         self.flush(fh)