ef66a97fa362468cd46caa48e164fde27f5842ea
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 """FUSE driver for Arvados Keep
6
7 Architecture:
8
9 There is one `Operations` object per mount point.  It is the entry point for all
10 read and write requests from the llfuse module.
11
12 The operations object owns an `Inodes` object.  The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
15
16 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
17 memory footprint of file system objects and when they are last used.  When the
18 cache limit is exceeded, the least recently used objects are cleared.
19
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
21
22 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
24
25 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`.  These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
30 returing a result.
31
32 The general FUSE operation flow is as follows:
33
34 - The request handler is called with either an inode or file handle that is the
35   subject of the operation.
36
37 - Look up the inode using the Inodes table or the file handle in the
38   filehandles table to get the file system object.
39
40 - For methods that alter files or directories, check that the operation is
41   valid and permitted using _check_writable().
42
43 - Call the relevant method on the file system object.
44
45 - Return the result.
46
47 The FUSE driver supports the Arvados event bus.  When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
49
50 """
51
52 from __future__ import absolute_import
53 from __future__ import division
54 from future.utils import viewitems
55 from future.utils import native
56 from future.utils import listvalues
57 from future import standard_library
58 standard_library.install_aliases()
59 from builtins import next
60 from builtins import str
61 from builtins import object
62 import os
63 import sys
64 import llfuse
65 import errno
66 import stat
67 import threading
68 import arvados
69 import pprint
70 import arvados.events
71 import re
72 import apiclient
73 import json
74 import logging
75 import time
76 import _strptime
77 import calendar
78 import threading
79 import itertools
80 import ciso8601
81 import collections
82 import functools
83 import arvados.keep
84 from prometheus_client import Summary
85 import queue
86
87 # Default _notify_queue has a limit of 1000 items, but it really needs to be
88 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
89 # details.
90
91 if hasattr(llfuse, 'capi'):
92     # llfuse < 0.42
93     llfuse.capi._notify_queue = queue.Queue()
94 else:
95     # llfuse >= 0.42
96     llfuse._notify_queue = queue.Queue()
97
98 LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
99
100 from .fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
101 from .fusefile import StringFile, FuseArvadosFile
102
103 _logger = logging.getLogger('arvados.arvados_fuse')
104
105 # Uncomment this to enable llfuse debug logging.
106 # log_handler = logging.StreamHandler()
107 # llogger = logging.getLogger('llfuse')
108 # llogger.addHandler(log_handler)
109 # llogger.setLevel(logging.DEBUG)
110
111 class Handle(object):
112     """Connects a numeric file handle to a File or Directory object that has
113     been opened by the client."""
114
115     def __init__(self, fh, obj):
116         self.fh = fh
117         self.obj = obj
118         self.obj.inc_use()
119
120     def release(self):
121         self.obj.dec_use()
122
123     def flush(self):
124         pass
125
126
127 class FileHandle(Handle):
128     """Connects a numeric file handle to a File  object that has
129     been opened by the client."""
130
131     def flush(self):
132         if self.obj.writable():
133             return self.obj.flush()
134
135
136 class DirectoryHandle(Handle):
137     """Connects a numeric file handle to a Directory object that has
138     been opened by the client."""
139
140     def __init__(self, fh, dirobj, entries):
141         super(DirectoryHandle, self).__init__(fh, dirobj)
142         self.entries = entries
143
144
145 class InodeCache(object):
146     """Records the memory footprint of objects and when they are last used.
147
148     When the cache limit is exceeded, the least recently used objects are
149     cleared.  Clearing the object means discarding its contents to release
150     memory.  The next time the object is accessed, it must be re-fetched from
151     the server.  Note that the inode cache limit is a soft limit; the cache
152     limit may be exceeded if necessary to load very large objects, it may also
153     be exceeded if open file handles prevent objects from being cleared.
154
155     """
156
157     def __init__(self, cap, min_entries=4):
158         self._entries = collections.OrderedDict()
159         self._by_uuid = {}
160         self.cap = cap
161         self._total = 0
162         self.min_entries = min_entries
163
164     def total(self):
165         return self._total
166
167     def _remove(self, obj, clear):
168         if clear:
169             # Kernel behavior seems to be that if a file is
170             # referenced, its parents remain referenced too. This
171             # means has_ref() exits early when a collection is not
172             # candidate for eviction.
173             #
174             # By contrast, in_use() doesn't increment references on
175             # parents, so it requires a full tree walk to determine if
176             # a collection is a candidate for eviction.  This takes
177             # .07s for 240000 files, which becomes a major drag when
178             # cap_cache is being called several times a second and
179             # there are multiple non-evictable collections in the
180             # cache.
181             #
182             # So it is important for performance that we do the
183             # has_ref() check first.
184
185             if obj.has_ref(True):
186                 _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
187                 return
188
189             if obj.in_use():
190                 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
191                 return
192
193             obj.kernel_invalidate()
194             _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
195             obj.clear()
196
197         # The llfuse lock is released in del_entry(), which is called by
198         # Directory.clear().  While the llfuse lock is released, it can happen
199         # that a reentrant call removes this entry before this call gets to it.
200         # Ensure that the entry is still valid before trying to remove it.
201         if obj.inode not in self._entries:
202             return
203
204         self._total -= obj.cache_size
205         del self._entries[obj.inode]
206         if obj.cache_uuid:
207             self._by_uuid[obj.cache_uuid].remove(obj)
208             if not self._by_uuid[obj.cache_uuid]:
209                 del self._by_uuid[obj.cache_uuid]
210             obj.cache_uuid = None
211         if clear:
212             _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
213
214     def cap_cache(self):
215         if self._total > self.cap:
216             for ent in listvalues(self._entries):
217                 if self._total < self.cap or len(self._entries) < self.min_entries:
218                     break
219                 self._remove(ent, True)
220
221     def manage(self, obj):
222         if obj.persisted():
223             obj.cache_size = obj.objsize()
224             self._entries[obj.inode] = obj
225             obj.cache_uuid = obj.uuid()
226             if obj.cache_uuid:
227                 if obj.cache_uuid not in self._by_uuid:
228                     self._by_uuid[obj.cache_uuid] = [obj]
229                 else:
230                     if obj not in self._by_uuid[obj.cache_uuid]:
231                         self._by_uuid[obj.cache_uuid].append(obj)
232             self._total += obj.objsize()
233             _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
234                           obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
235             self.cap_cache()
236
237     def touch(self, obj):
238         if obj.persisted():
239             if obj.inode in self._entries:
240                 self._remove(obj, False)
241             self.manage(obj)
242
243     def unmanage(self, obj):
244         if obj.persisted() and obj.inode in self._entries:
245             self._remove(obj, True)
246
247     def find_by_uuid(self, uuid):
248         return self._by_uuid.get(uuid, [])
249
250     def clear(self):
251         self._entries.clear()
252         self._by_uuid.clear()
253         self._total = 0
254
255 class Inodes(object):
256     """Manage the set of inodes.  This is the mapping from a numeric id
257     to a concrete File or Directory object"""
258
259     def __init__(self, inode_cache, encoding="utf-8"):
260         self._entries = {}
261         self._counter = itertools.count(llfuse.ROOT_INODE)
262         self.inode_cache = inode_cache
263         self.encoding = encoding
264         self.deferred_invalidations = []
265
266     def __getitem__(self, item):
267         return self._entries[item]
268
269     def __setitem__(self, key, item):
270         self._entries[key] = item
271
272     def __iter__(self):
273         return iter(self._entries.keys())
274
275     def items(self):
276         return self._entries.items()
277
278     def __contains__(self, k):
279         return k in self._entries
280
281     def touch(self, entry):
282         entry._atime = time.time()
283         self.inode_cache.touch(entry)
284
285     def add_entry(self, entry):
286         entry.inode = next(self._counter)
287         if entry.inode == llfuse.ROOT_INODE:
288             entry.inc_ref()
289         self._entries[entry.inode] = entry
290         self.inode_cache.manage(entry)
291         return entry
292
293     def del_entry(self, entry):
294         if entry.ref_count == 0:
295             self.inode_cache.unmanage(entry)
296             del self._entries[entry.inode]
297             with llfuse.lock_released:
298                 entry.finalize()
299             entry.inode = None
300         else:
301             entry.dead = True
302             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
303
304     def invalidate_inode(self, entry):
305         if entry.has_ref(False):
306             # Only necessary if the kernel has previously done a lookup on this
307             # inode and hasn't yet forgotten about it.
308             llfuse.invalidate_inode(entry.inode)
309
310     def invalidate_entry(self, entry, name):
311         if entry.has_ref(False):
312             # Only necessary if the kernel has previously done a lookup on this
313             # inode and hasn't yet forgotten about it.
314             llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
315
316     def clear(self):
317         self.inode_cache.clear()
318
319         for k,v in viewitems(self._entries):
320             try:
321                 v.finalize()
322             except Exception as e:
323                 _logger.exception("Error during finalize of inode %i", k)
324
325         self._entries.clear()
326
327
328 def catch_exceptions(orig_func):
329     """Catch uncaught exceptions and log them consistently."""
330
331     @functools.wraps(orig_func)
332     def catch_exceptions_wrapper(self, *args, **kwargs):
333         try:
334             return orig_func(self, *args, **kwargs)
335         except llfuse.FUSEError:
336             raise
337         except EnvironmentError as e:
338             raise llfuse.FUSEError(e.errno)
339         except arvados.errors.KeepWriteError as e:
340             _logger.error("Keep write error: " + str(e))
341             raise llfuse.FUSEError(errno.EIO)
342         except arvados.errors.NotFoundError as e:
343             _logger.error("Block not found error: " + str(e))
344             raise llfuse.FUSEError(errno.EIO)
345         except:
346             _logger.exception("Unhandled exception during FUSE operation")
347             raise llfuse.FUSEError(errno.EIO)
348
349     return catch_exceptions_wrapper
350
351
352 class Operations(llfuse.Operations):
353     """This is the main interface with llfuse.
354
355     The methods on this object are called by llfuse threads to service FUSE
356     events to query and read from the file system.
357
358     llfuse has its own global lock which is acquired before calling a request handler,
359     so request handlers do not run concurrently unless the lock is explicitly released
360     using 'with llfuse.lock_released:'
361
362     """
363
364     fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
365     read_time = fuse_time.labels(op='read')
366     write_time = fuse_time.labels(op='write')
367     destroy_time = fuse_time.labels(op='destroy')
368     on_event_time = fuse_time.labels(op='on_event')
369     getattr_time = fuse_time.labels(op='getattr')
370     setattr_time = fuse_time.labels(op='setattr')
371     lookup_time = fuse_time.labels(op='lookup')
372     forget_time = fuse_time.labels(op='forget')
373     open_time = fuse_time.labels(op='open')
374     release_time = fuse_time.labels(op='release')
375     opendir_time = fuse_time.labels(op='opendir')
376     readdir_time = fuse_time.labels(op='readdir')
377     statfs_time = fuse_time.labels(op='statfs')
378     create_time = fuse_time.labels(op='create')
379     mkdir_time = fuse_time.labels(op='mkdir')
380     unlink_time = fuse_time.labels(op='unlink')
381     rmdir_time = fuse_time.labels(op='rmdir')
382     rename_time = fuse_time.labels(op='rename')
383     flush_time = fuse_time.labels(op='flush')
384
385     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
386         super(Operations, self).__init__()
387
388         self._api_client = api_client
389
390         if not inode_cache:
391             inode_cache = InodeCache(cap=256*1024*1024)
392         self.inodes = Inodes(inode_cache, encoding=encoding)
393         self.uid = uid
394         self.gid = gid
395         self.enable_write = enable_write
396
397         # dict of inode to filehandle
398         self._filehandles = {}
399         self._filehandles_counter = itertools.count(0)
400
401         # Other threads that need to wait until the fuse driver
402         # is fully initialized should wait() on this event object.
403         self.initlock = threading.Event()
404
405         # If we get overlapping shutdown events (e.g., fusermount -u
406         # -z and operations.destroy()) llfuse calls forget() on inodes
407         # that have already been deleted. To avoid this, we make
408         # forget() a no-op if called after destroy().
409         self._shutdown_started = threading.Event()
410
411         self.num_retries = num_retries
412
413         self.read_counter = arvados.keep.Counter()
414         self.write_counter = arvados.keep.Counter()
415         self.read_ops_counter = arvados.keep.Counter()
416         self.write_ops_counter = arvados.keep.Counter()
417
418         self.events = None
419
420     def init(self):
421         # Allow threads that are waiting for the driver to be finished
422         # initializing to continue
423         self.initlock.set()
424
425     def metric_samples(self):
426         return self.fuse_time.collect()[0].samples
427
428     def metric_op_names(self):
429         ops = []
430         for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
431             if cur_op not in ops:
432                 ops.append(cur_op)
433         return ops
434
435     def metric_value(self, opname, metric):
436         op_value = [sample.value for sample in self.metric_samples()
437                     if sample.name == metric and sample.labels['op'] == opname]
438         return op_value[0] if len(op_value) == 1 else None
439
440     def metric_sum_func(self, opname):
441         return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
442
443     def metric_count_func(self, opname):
444         return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
445
446     @destroy_time.time()
447     @catch_exceptions
448     def destroy(self):
449         self._shutdown_started.set()
450         if self.events:
451             self.events.close()
452             self.events = None
453
454         # Different versions of llfuse require and forbid us to
455         # acquire the lock here. See #8345#note-37, #10805#note-9.
456         if LLFUSE_VERSION_0 and llfuse.lock.acquire():
457             # llfuse < 0.42
458             self.inodes.clear()
459             llfuse.lock.release()
460         else:
461             # llfuse >= 0.42
462             self.inodes.clear()
463
464     def access(self, inode, mode, ctx):
465         return True
466
467     def listen_for_events(self):
468         self.events = arvados.events.subscribe(
469             self._api_client,
470             [["event_type", "in", ["create", "update", "delete"]]],
471             self.on_event)
472
473     @on_event_time.time()
474     @catch_exceptions
475     def on_event(self, ev):
476         if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
477             return
478         with llfuse.lock:
479             properties = ev.get("properties") or {}
480             old_attrs = properties.get("old_attributes") or {}
481             new_attrs = properties.get("new_attributes") or {}
482
483             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
484                 item.invalidate()
485                 if ev.get("object_kind") == "arvados#collection":
486                     pdh = new_attrs.get("portable_data_hash")
487                     # new_attributes.modified_at currently lacks
488                     # subsecond precision (see #6347) so use event_at
489                     # which should always be the same.
490                     stamp = ev.get("event_at")
491                     if (stamp and pdh and item.writable() and
492                         item.collection is not None and
493                         item.collection.modified() and
494                         new_attrs.get("is_trashed") is not True):
495                         item.update(to_record_version=(stamp, pdh))
496
497             oldowner = old_attrs.get("owner_uuid")
498             newowner = ev.get("object_owner_uuid")
499             for parent in (
500                     self.inodes.inode_cache.find_by_uuid(oldowner) +
501                     self.inodes.inode_cache.find_by_uuid(newowner)):
502                 parent.child_event(ev)
503
504     @getattr_time.time()
505     @catch_exceptions
506     def getattr(self, inode, ctx=None):
507         if inode not in self.inodes:
508             raise llfuse.FUSEError(errno.ENOENT)
509
510         e = self.inodes[inode]
511
512         entry = llfuse.EntryAttributes()
513         entry.st_ino = inode
514         entry.generation = 0
515         entry.entry_timeout = 0
516         entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
517
518         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
519         if isinstance(e, Directory):
520             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
521         else:
522             entry.st_mode |= stat.S_IFREG
523             if isinstance(e, FuseArvadosFile):
524                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
525
526         if self.enable_write and e.writable():
527             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
528
529         entry.st_nlink = 1
530         entry.st_uid = self.uid
531         entry.st_gid = self.gid
532         entry.st_rdev = 0
533
534         entry.st_size = e.size()
535
536         entry.st_blksize = 512
537         entry.st_blocks = (entry.st_size // 512) + 1
538         if hasattr(entry, 'st_atime_ns'):
539             # llfuse >= 0.42
540             entry.st_atime_ns = int(e.atime() * 1000000000)
541             entry.st_mtime_ns = int(e.mtime() * 1000000000)
542             entry.st_ctime_ns = int(e.mtime() * 1000000000)
543         else:
544             # llfuse < 0.42
545             entry.st_atime = int(e.atime)
546             entry.st_mtime = int(e.mtime)
547             entry.st_ctime = int(e.mtime)
548
549         return entry
550
551     @setattr_time.time()
552     @catch_exceptions
553     def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
554         entry = self.getattr(inode)
555
556         if fh is not None and fh in self._filehandles:
557             handle = self._filehandles[fh]
558             e = handle.obj
559         else:
560             e = self.inodes[inode]
561
562         if fields is None:
563             # llfuse < 0.42
564             update_size = attr.st_size is not None
565         else:
566             # llfuse >= 0.42
567             update_size = fields.update_size
568         if update_size and isinstance(e, FuseArvadosFile):
569             with llfuse.lock_released:
570                 e.arvfile.truncate(attr.st_size)
571                 entry.st_size = e.arvfile.size()
572
573         return entry
574
575     @lookup_time.time()
576     @catch_exceptions
577     def lookup(self, parent_inode, name, ctx=None):
578         name = str(name, self.inodes.encoding)
579         inode = None
580
581         if name == '.':
582             inode = parent_inode
583         else:
584             if parent_inode in self.inodes:
585                 p = self.inodes[parent_inode]
586                 self.inodes.touch(p)
587                 if name == '..':
588                     inode = p.parent_inode
589                 elif isinstance(p, Directory) and name in p:
590                     inode = p[name].inode
591
592         if inode != None:
593             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
594                       parent_inode, name, inode)
595             self.inodes[inode].inc_ref()
596             return self.getattr(inode)
597         else:
598             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
599                       parent_inode, name)
600             raise llfuse.FUSEError(errno.ENOENT)
601
602     @forget_time.time()
603     @catch_exceptions
604     def forget(self, inodes):
605         if self._shutdown_started.is_set():
606             return
607         for inode, nlookup in inodes:
608             ent = self.inodes[inode]
609             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
610             if ent.dec_ref(nlookup) == 0 and ent.dead:
611                 self.inodes.del_entry(ent)
612
613     @open_time.time()
614     @catch_exceptions
615     def open(self, inode, flags, ctx=None):
616         if inode in self.inodes:
617             p = self.inodes[inode]
618         else:
619             raise llfuse.FUSEError(errno.ENOENT)
620
621         if isinstance(p, Directory):
622             raise llfuse.FUSEError(errno.EISDIR)
623
624         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
625             raise llfuse.FUSEError(errno.EPERM)
626
627         fh = next(self._filehandles_counter)
628         self._filehandles[fh] = FileHandle(fh, p)
629         self.inodes.touch(p)
630
631         # Normally, we will have received an "update" event if the
632         # parent collection is stale here. However, even if the parent
633         # collection hasn't changed, the manifest might have been
634         # fetched so long ago that the signatures on the data block
635         # locators have expired. Calling checkupdate() on all
636         # ancestors ensures the signatures will be refreshed if
637         # necessary.
638         while p.parent_inode in self.inodes:
639             if p == self.inodes[p.parent_inode]:
640                 break
641             p = self.inodes[p.parent_inode]
642             self.inodes.touch(p)
643             p.checkupdate()
644
645         _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
646
647         return fh
648
649     @read_time.time()
650     @catch_exceptions
651     def read(self, fh, off, size):
652         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
653         self.read_ops_counter.add(1)
654
655         if fh in self._filehandles:
656             handle = self._filehandles[fh]
657         else:
658             raise llfuse.FUSEError(errno.EBADF)
659
660         self.inodes.touch(handle.obj)
661
662         r = handle.obj.readfrom(off, size, self.num_retries)
663         if r:
664             self.read_counter.add(len(r))
665         return r
666
667     @write_time.time()
668     @catch_exceptions
669     def write(self, fh, off, buf):
670         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
671         self.write_ops_counter.add(1)
672
673         if fh in self._filehandles:
674             handle = self._filehandles[fh]
675         else:
676             raise llfuse.FUSEError(errno.EBADF)
677
678         if not handle.obj.writable():
679             raise llfuse.FUSEError(errno.EPERM)
680
681         self.inodes.touch(handle.obj)
682
683         w = handle.obj.writeto(off, buf, self.num_retries)
684         if w:
685             self.write_counter.add(w)
686         return w
687
688     @release_time.time()
689     @catch_exceptions
690     def release(self, fh):
691         if fh in self._filehandles:
692             _logger.debug("arv-mount release fh %i", fh)
693             try:
694                 self._filehandles[fh].flush()
695             except Exception:
696                 raise
697             finally:
698                 self._filehandles[fh].release()
699                 del self._filehandles[fh]
700         self.inodes.inode_cache.cap_cache()
701
702     def releasedir(self, fh):
703         self.release(fh)
704
705     @opendir_time.time()
706     @catch_exceptions
707     def opendir(self, inode, ctx=None):
708         _logger.debug("arv-mount opendir: inode %i", inode)
709
710         if inode in self.inodes:
711             p = self.inodes[inode]
712         else:
713             raise llfuse.FUSEError(errno.ENOENT)
714
715         if not isinstance(p, Directory):
716             raise llfuse.FUSEError(errno.ENOTDIR)
717
718         fh = next(self._filehandles_counter)
719         if p.parent_inode in self.inodes:
720             parent = self.inodes[p.parent_inode]
721         else:
722             raise llfuse.FUSEError(errno.EIO)
723
724         # update atime
725         self.inodes.touch(p)
726
727         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
728         return fh
729
730     @readdir_time.time()
731     @catch_exceptions
732     def readdir(self, fh, off):
733         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
734
735         if fh in self._filehandles:
736             handle = self._filehandles[fh]
737         else:
738             raise llfuse.FUSEError(errno.EBADF)
739
740         e = off
741         while e < len(handle.entries):
742             if handle.entries[e][1].inode in self.inodes:
743                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
744             e += 1
745
746     @statfs_time.time()
747     @catch_exceptions
748     def statfs(self, ctx=None):
749         st = llfuse.StatvfsData()
750         st.f_bsize = 128 * 1024
751         st.f_blocks = 0
752         st.f_files = 0
753
754         st.f_bfree = 0
755         st.f_bavail = 0
756
757         st.f_ffree = 0
758         st.f_favail = 0
759
760         st.f_frsize = 0
761         return st
762
763     def _check_writable(self, inode_parent):
764         if not self.enable_write:
765             raise llfuse.FUSEError(errno.EROFS)
766
767         if inode_parent in self.inodes:
768             p = self.inodes[inode_parent]
769         else:
770             raise llfuse.FUSEError(errno.ENOENT)
771
772         if not isinstance(p, Directory):
773             raise llfuse.FUSEError(errno.ENOTDIR)
774
775         if not p.writable():
776             raise llfuse.FUSEError(errno.EPERM)
777
778         return p
779
780     @create_time.time()
781     @catch_exceptions
782     def create(self, inode_parent, name, mode, flags, ctx=None):
783         name = name.decode()
784         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
785
786         p = self._check_writable(inode_parent)
787         p.create(name)
788
789         # The file entry should have been implicitly created by callback.
790         f = p[name]
791         fh = next(self._filehandles_counter)
792         self._filehandles[fh] = FileHandle(fh, f)
793         self.inodes.touch(p)
794
795         f.inc_ref()
796         return (fh, self.getattr(f.inode))
797
798     @mkdir_time.time()
799     @catch_exceptions
800     def mkdir(self, inode_parent, name, mode, ctx=None):
801         name = name.decode()
802         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
803
804         p = self._check_writable(inode_parent)
805         p.mkdir(name)
806
807         # The dir entry should have been implicitly created by callback.
808         d = p[name]
809
810         d.inc_ref()
811         return self.getattr(d.inode)
812
813     @unlink_time.time()
814     @catch_exceptions
815     def unlink(self, inode_parent, name, ctx=None):
816         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
817         p = self._check_writable(inode_parent)
818         p.unlink(name.decode())
819
820     @rmdir_time.time()
821     @catch_exceptions
822     def rmdir(self, inode_parent, name, ctx=None):
823         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
824         p = self._check_writable(inode_parent)
825         p.rmdir(name.decode())
826
827     @rename_time.time()
828     @catch_exceptions
829     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
830         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
831         src = self._check_writable(inode_parent_old)
832         dest = self._check_writable(inode_parent_new)
833         dest.rename(name_old.decode(), name_new.decode(), src)
834
835     @flush_time.time()
836     @catch_exceptions
837     def flush(self, fh):
838         if fh in self._filehandles:
839             self._filehandles[fh].flush()
840
841     def fsync(self, fh, datasync):
842         self.flush(fh)
843
844     def fsyncdir(self, fh, datasync):
845         self.flush(fh)