Merge branch 'master' into 14669-java-sdk-v2
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 """FUSE driver for Arvados Keep
6
7 Architecture:
8
9 There is one `Operations` object per mount point.  It is the entry point for all
10 read and write requests from the llfuse module.
11
12 The operations object owns an `Inodes` object.  The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
15
16 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
17 memory footprint of file system objects and when they are last used.  When the
18 cache limit is exceeded, the least recently used objects are cleared.
19
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
21
22 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
24
25 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`.  These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
30 returing a result.
31
32 The general FUSE operation flow is as follows:
33
34 - The request handler is called with either an inode or file handle that is the
35   subject of the operation.
36
37 - Look up the inode using the Inodes table or the file handle in the
38   filehandles table to get the file system object.
39
40 - For methods that alter files or directories, check that the operation is
41   valid and permitted using _check_writable().
42
43 - Call the relevant method on the file system object.
44
45 - Return the result.
46
47 The FUSE driver supports the Arvados event bus.  When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
49
50 """
51
52 import os
53 import sys
54 import llfuse
55 import errno
56 import stat
57 import threading
58 import arvados
59 import pprint
60 import arvados.events
61 import re
62 import apiclient
63 import json
64 import logging
65 import time
66 import _strptime
67 import calendar
68 import threading
69 import itertools
70 import ciso8601
71 import collections
72 import functools
73 import arvados.keep
74 from prometheus_client import Summary
75
76 import Queue
77
78 # Default _notify_queue has a limit of 1000 items, but it really needs to be
79 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
80 # details.
81
82 if hasattr(llfuse, 'capi'):
83     # llfuse < 0.42
84     llfuse.capi._notify_queue = Queue.Queue()
85 else:
86     # llfuse >= 0.42
87     llfuse._notify_queue = Queue.Queue()
88
89 LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
90
91 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
92 from fusefile import StringFile, FuseArvadosFile
93
94 _logger = logging.getLogger('arvados.arvados_fuse')
95
96 # Uncomment this to enable llfuse debug logging.
97 # log_handler = logging.StreamHandler()
98 # llogger = logging.getLogger('llfuse')
99 # llogger.addHandler(log_handler)
100 # llogger.setLevel(logging.DEBUG)
101
102 class Handle(object):
103     """Connects a numeric file handle to a File or Directory object that has
104     been opened by the client."""
105
106     def __init__(self, fh, obj):
107         self.fh = fh
108         self.obj = obj
109         self.obj.inc_use()
110
111     def release(self):
112         self.obj.dec_use()
113
114     def flush(self):
115         pass
116
117
118 class FileHandle(Handle):
119     """Connects a numeric file handle to a File  object that has
120     been opened by the client."""
121
122     def flush(self):
123         if self.obj.writable():
124             return self.obj.flush()
125
126
127 class DirectoryHandle(Handle):
128     """Connects a numeric file handle to a Directory object that has
129     been opened by the client."""
130
131     def __init__(self, fh, dirobj, entries):
132         super(DirectoryHandle, self).__init__(fh, dirobj)
133         self.entries = entries
134
135
136 class InodeCache(object):
137     """Records the memory footprint of objects and when they are last used.
138
139     When the cache limit is exceeded, the least recently used objects are
140     cleared.  Clearing the object means discarding its contents to release
141     memory.  The next time the object is accessed, it must be re-fetched from
142     the server.  Note that the inode cache limit is a soft limit; the cache
143     limit may be exceeded if necessary to load very large objects, it may also
144     be exceeded if open file handles prevent objects from being cleared.
145
146     """
147
148     def __init__(self, cap, min_entries=4):
149         self._entries = collections.OrderedDict()
150         self._by_uuid = {}
151         self.cap = cap
152         self._total = 0
153         self.min_entries = min_entries
154
155     def total(self):
156         return self._total
157
158     def _remove(self, obj, clear):
159         if clear:
160             # Kernel behavior seems to be that if a file is
161             # referenced, its parents remain referenced too. This
162             # means has_ref() exits early when a collection is not
163             # candidate for eviction.
164             #
165             # By contrast, in_use() doesn't increment references on
166             # parents, so it requires a full tree walk to determine if
167             # a collection is a candidate for eviction.  This takes
168             # .07s for 240000 files, which becomes a major drag when
169             # cap_cache is being called several times a second and
170             # there are multiple non-evictable collections in the
171             # cache.
172             #
173             # So it is important for performance that we do the
174             # has_ref() check first.
175
176             if obj.has_ref(True):
177                 _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
178                 return
179
180             if obj.in_use():
181                 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
182                 return
183
184             obj.kernel_invalidate()
185             _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
186             obj.clear()
187
188         # The llfuse lock is released in del_entry(), which is called by
189         # Directory.clear().  While the llfuse lock is released, it can happen
190         # that a reentrant call removes this entry before this call gets to it.
191         # Ensure that the entry is still valid before trying to remove it.
192         if obj.inode not in self._entries:
193             return
194
195         self._total -= obj.cache_size
196         del self._entries[obj.inode]
197         if obj.cache_uuid:
198             self._by_uuid[obj.cache_uuid].remove(obj)
199             if not self._by_uuid[obj.cache_uuid]:
200                 del self._by_uuid[obj.cache_uuid]
201             obj.cache_uuid = None
202         if clear:
203             _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
204
205     def cap_cache(self):
206         if self._total > self.cap:
207             for ent in self._entries.values():
208                 if self._total < self.cap or len(self._entries) < self.min_entries:
209                     break
210                 self._remove(ent, True)
211
212     def manage(self, obj):
213         if obj.persisted():
214             obj.cache_size = obj.objsize()
215             self._entries[obj.inode] = obj
216             obj.cache_uuid = obj.uuid()
217             if obj.cache_uuid:
218                 if obj.cache_uuid not in self._by_uuid:
219                     self._by_uuid[obj.cache_uuid] = [obj]
220                 else:
221                     if obj not in self._by_uuid[obj.cache_uuid]:
222                         self._by_uuid[obj.cache_uuid].append(obj)
223             self._total += obj.objsize()
224             _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
225                           obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
226             self.cap_cache()
227
228     def touch(self, obj):
229         if obj.persisted():
230             if obj.inode in self._entries:
231                 self._remove(obj, False)
232             self.manage(obj)
233
234     def unmanage(self, obj):
235         if obj.persisted() and obj.inode in self._entries:
236             self._remove(obj, True)
237
238     def find_by_uuid(self, uuid):
239         return self._by_uuid.get(uuid, [])
240
241     def clear(self):
242         self._entries.clear()
243         self._by_uuid.clear()
244         self._total = 0
245
246 class Inodes(object):
247     """Manage the set of inodes.  This is the mapping from a numeric id
248     to a concrete File or Directory object"""
249
250     def __init__(self, inode_cache, encoding="utf-8"):
251         self._entries = {}
252         self._counter = itertools.count(llfuse.ROOT_INODE)
253         self.inode_cache = inode_cache
254         self.encoding = encoding
255         self.deferred_invalidations = []
256
257     def __getitem__(self, item):
258         return self._entries[item]
259
260     def __setitem__(self, key, item):
261         self._entries[key] = item
262
263     def __iter__(self):
264         return self._entries.iterkeys()
265
266     def items(self):
267         return self._entries.items()
268
269     def __contains__(self, k):
270         return k in self._entries
271
272     def touch(self, entry):
273         entry._atime = time.time()
274         self.inode_cache.touch(entry)
275
276     def add_entry(self, entry):
277         entry.inode = next(self._counter)
278         if entry.inode == llfuse.ROOT_INODE:
279             entry.inc_ref()
280         self._entries[entry.inode] = entry
281         self.inode_cache.manage(entry)
282         return entry
283
284     def del_entry(self, entry):
285         if entry.ref_count == 0:
286             self.inode_cache.unmanage(entry)
287             del self._entries[entry.inode]
288             with llfuse.lock_released:
289                 entry.finalize()
290             entry.inode = None
291         else:
292             entry.dead = True
293             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
294
295     def invalidate_inode(self, entry):
296         if entry.has_ref(False):
297             # Only necessary if the kernel has previously done a lookup on this
298             # inode and hasn't yet forgotten about it.
299             llfuse.invalidate_inode(entry.inode)
300
301     def invalidate_entry(self, entry, name):
302         if entry.has_ref(False):
303             # Only necessary if the kernel has previously done a lookup on this
304             # inode and hasn't yet forgotten about it.
305             llfuse.invalidate_entry(entry.inode, name.encode(self.encoding))
306
307     def clear(self):
308         self.inode_cache.clear()
309
310         for k,v in self._entries.items():
311             try:
312                 v.finalize()
313             except Exception as e:
314                 _logger.exception("Error during finalize of inode %i", k)
315
316         self._entries.clear()
317
318
319 def catch_exceptions(orig_func):
320     """Catch uncaught exceptions and log them consistently."""
321
322     @functools.wraps(orig_func)
323     def catch_exceptions_wrapper(self, *args, **kwargs):
324         try:
325             return orig_func(self, *args, **kwargs)
326         except llfuse.FUSEError:
327             raise
328         except EnvironmentError as e:
329             raise llfuse.FUSEError(e.errno)
330         except arvados.errors.KeepWriteError as e:
331             _logger.error("Keep write error: " + str(e))
332             raise llfuse.FUSEError(errno.EIO)
333         except arvados.errors.NotFoundError as e:
334             _logger.error("Block not found error: " + str(e))
335             raise llfuse.FUSEError(errno.EIO)
336         except:
337             _logger.exception("Unhandled exception during FUSE operation")
338             raise llfuse.FUSEError(errno.EIO)
339
340     return catch_exceptions_wrapper
341
342
343 class Operations(llfuse.Operations):
344     """This is the main interface with llfuse.
345
346     The methods on this object are called by llfuse threads to service FUSE
347     events to query and read from the file system.
348
349     llfuse has its own global lock which is acquired before calling a request handler,
350     so request handlers do not run concurrently unless the lock is explicitly released
351     using 'with llfuse.lock_released:'
352
353     """
354
355     fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
356     read_time = fuse_time.labels(op='read')
357     write_time = fuse_time.labels(op='write')
358     destroy_time = fuse_time.labels(op='destroy')
359     on_event_time = fuse_time.labels(op='on_event')
360     getattr_time = fuse_time.labels(op='getattr')
361     setattr_time = fuse_time.labels(op='setattr')
362     lookup_time = fuse_time.labels(op='lookup')
363     forget_time = fuse_time.labels(op='forget')
364     open_time = fuse_time.labels(op='open')
365     release_time = fuse_time.labels(op='release')
366     opendir_time = fuse_time.labels(op='opendir')
367     readdir_time = fuse_time.labels(op='readdir')
368     statfs_time = fuse_time.labels(op='statfs')
369     create_time = fuse_time.labels(op='create')
370     mkdir_time = fuse_time.labels(op='mkdir')
371     unlink_time = fuse_time.labels(op='unlink')
372     rmdir_time = fuse_time.labels(op='rmdir')
373     rename_time = fuse_time.labels(op='rename')
374     flush_time = fuse_time.labels(op='flush')
375
376     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
377         super(Operations, self).__init__()
378
379         self._api_client = api_client
380
381         if not inode_cache:
382             inode_cache = InodeCache(cap=256*1024*1024)
383         self.inodes = Inodes(inode_cache, encoding=encoding)
384         self.uid = uid
385         self.gid = gid
386         self.enable_write = enable_write
387
388         # dict of inode to filehandle
389         self._filehandles = {}
390         self._filehandles_counter = itertools.count(0)
391
392         # Other threads that need to wait until the fuse driver
393         # is fully initialized should wait() on this event object.
394         self.initlock = threading.Event()
395
396         # If we get overlapping shutdown events (e.g., fusermount -u
397         # -z and operations.destroy()) llfuse calls forget() on inodes
398         # that have already been deleted. To avoid this, we make
399         # forget() a no-op if called after destroy().
400         self._shutdown_started = threading.Event()
401
402         self.num_retries = num_retries
403
404         self.read_counter = arvados.keep.Counter()
405         self.write_counter = arvados.keep.Counter()
406         self.read_ops_counter = arvados.keep.Counter()
407         self.write_ops_counter = arvados.keep.Counter()
408
409         self.events = None
410
411     def init(self):
412         # Allow threads that are waiting for the driver to be finished
413         # initializing to continue
414         self.initlock.set()
415
416     def metric_samples(self):
417         return self.fuse_time.collect()[0].samples 
418
419     def metric_op_names(self):
420         ops = []
421         for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
422             if cur_op not in ops:
423                 ops.append(cur_op)
424         return ops
425
426     def metric_value(self, opname, metric):
427         op_value = [sample.value for sample in self.metric_samples()
428                     if sample.name == metric and sample.labels['op'] == opname]
429         return op_value[0] if len(op_value) == 1 else None
430
431     def metric_sum_func(self, opname):
432         return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
433
434     def metric_count_func(self, opname):
435         return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
436
437     @destroy_time.time()
438     @catch_exceptions
439     def destroy(self):
440         self._shutdown_started.set()
441         if self.events:
442             self.events.close()
443             self.events = None
444
445         # Different versions of llfuse require and forbid us to
446         # acquire the lock here. See #8345#note-37, #10805#note-9.
447         if LLFUSE_VERSION_0 and llfuse.lock.acquire():
448             # llfuse < 0.42
449             self.inodes.clear()
450             llfuse.lock.release()
451         else:
452             # llfuse >= 0.42
453             self.inodes.clear()
454
455     def access(self, inode, mode, ctx):
456         return True
457
458     def listen_for_events(self):
459         self.events = arvados.events.subscribe(
460             self._api_client,
461             [["event_type", "in", ["create", "update", "delete"]]],
462             self.on_event)
463
464     @on_event_time.time()
465     @catch_exceptions
466     def on_event(self, ev):
467         if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
468             return
469         with llfuse.lock:
470             properties = ev.get("properties") or {}
471             old_attrs = properties.get("old_attributes") or {}
472             new_attrs = properties.get("new_attributes") or {}
473
474             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
475                 item.invalidate()
476                 if ev.get("object_kind") == "arvados#collection":
477                     pdh = new_attrs.get("portable_data_hash")
478                     # new_attributes.modified_at currently lacks
479                     # subsecond precision (see #6347) so use event_at
480                     # which should always be the same.
481                     stamp = ev.get("event_at")
482                     if (stamp and pdh and item.writable() and
483                         item.collection is not None and
484                         item.collection.modified() and
485                         new_attrs.get("is_trashed") is not True):
486                         item.update(to_record_version=(stamp, pdh))
487
488             oldowner = old_attrs.get("owner_uuid")
489             newowner = ev.get("object_owner_uuid")
490             for parent in (
491                     self.inodes.inode_cache.find_by_uuid(oldowner) +
492                     self.inodes.inode_cache.find_by_uuid(newowner)):
493                 parent.child_event(ev)
494
495     @getattr_time.time()
496     @catch_exceptions
497     def getattr(self, inode, ctx=None):
498         if inode not in self.inodes:
499             raise llfuse.FUSEError(errno.ENOENT)
500
501         e = self.inodes[inode]
502
503         entry = llfuse.EntryAttributes()
504         entry.st_ino = inode
505         entry.generation = 0
506         entry.entry_timeout = 0
507         entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
508
509         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
510         if isinstance(e, Directory):
511             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
512         else:
513             entry.st_mode |= stat.S_IFREG
514             if isinstance(e, FuseArvadosFile):
515                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
516
517         if self.enable_write and e.writable():
518             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
519
520         entry.st_nlink = 1
521         entry.st_uid = self.uid
522         entry.st_gid = self.gid
523         entry.st_rdev = 0
524
525         entry.st_size = e.size()
526
527         entry.st_blksize = 512
528         entry.st_blocks = (entry.st_size/512)+1
529         if hasattr(entry, 'st_atime_ns'):
530             # llfuse >= 0.42
531             entry.st_atime_ns = int(e.atime() * 1000000000)
532             entry.st_mtime_ns = int(e.mtime() * 1000000000)
533             entry.st_ctime_ns = int(e.mtime() * 1000000000)
534         else:
535             # llfuse < 0.42
536             entry.st_atime = int(e.atime)
537             entry.st_mtime = int(e.mtime)
538             entry.st_ctime = int(e.mtime)
539
540         return entry
541
542     @setattr_time.time()
543     @catch_exceptions
544     def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
545         entry = self.getattr(inode)
546
547         if fh is not None and fh in self._filehandles:
548             handle = self._filehandles[fh]
549             e = handle.obj
550         else:
551             e = self.inodes[inode]
552
553         if fields is None:
554             # llfuse < 0.42
555             update_size = attr.st_size is not None
556         else:
557             # llfuse >= 0.42
558             update_size = fields.update_size
559         if update_size and isinstance(e, FuseArvadosFile):
560             with llfuse.lock_released:
561                 e.arvfile.truncate(attr.st_size)
562                 entry.st_size = e.arvfile.size()
563
564         return entry
565
566     @lookup_time.time()
567     @catch_exceptions
568     def lookup(self, parent_inode, name, ctx=None):
569         name = unicode(name, self.inodes.encoding)
570         inode = None
571
572         if name == '.':
573             inode = parent_inode
574         else:
575             if parent_inode in self.inodes:
576                 p = self.inodes[parent_inode]
577                 self.inodes.touch(p)
578                 if name == '..':
579                     inode = p.parent_inode
580                 elif isinstance(p, Directory) and name in p:
581                     inode = p[name].inode
582
583         if inode != None:
584             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
585                       parent_inode, name, inode)
586             self.inodes[inode].inc_ref()
587             return self.getattr(inode)
588         else:
589             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
590                       parent_inode, name)
591             raise llfuse.FUSEError(errno.ENOENT)
592
593     @forget_time.time()
594     @catch_exceptions
595     def forget(self, inodes):
596         if self._shutdown_started.is_set():
597             return
598         for inode, nlookup in inodes:
599             ent = self.inodes[inode]
600             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
601             if ent.dec_ref(nlookup) == 0 and ent.dead:
602                 self.inodes.del_entry(ent)
603
604     @open_time.time()
605     @catch_exceptions
606     def open(self, inode, flags, ctx=None):
607         if inode in self.inodes:
608             p = self.inodes[inode]
609         else:
610             raise llfuse.FUSEError(errno.ENOENT)
611
612         if isinstance(p, Directory):
613             raise llfuse.FUSEError(errno.EISDIR)
614
615         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
616             raise llfuse.FUSEError(errno.EPERM)
617
618         fh = next(self._filehandles_counter)
619         self._filehandles[fh] = FileHandle(fh, p)
620         self.inodes.touch(p)
621
622         # Normally, we will have received an "update" event if the
623         # parent collection is stale here. However, even if the parent
624         # collection hasn't changed, the manifest might have been
625         # fetched so long ago that the signatures on the data block
626         # locators have expired. Calling checkupdate() on all
627         # ancestors ensures the signatures will be refreshed if
628         # necessary.
629         while p.parent_inode in self.inodes:
630             if p == self.inodes[p.parent_inode]:
631                 break
632             p = self.inodes[p.parent_inode]
633             self.inodes.touch(p)
634             p.checkupdate()
635
636         _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
637
638         return fh
639
640     @read_time.time()
641     @catch_exceptions
642     def read(self, fh, off, size):
643         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
644         self.read_ops_counter.add(1)
645
646         if fh in self._filehandles:
647             handle = self._filehandles[fh]
648         else:
649             raise llfuse.FUSEError(errno.EBADF)
650
651         self.inodes.touch(handle.obj)
652
653         r = handle.obj.readfrom(off, size, self.num_retries)
654         if r:
655             self.read_counter.add(len(r))
656         return r
657
658     @write_time.time()
659     @catch_exceptions
660     def write(self, fh, off, buf):
661         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
662         self.write_ops_counter.add(1)
663
664         if fh in self._filehandles:
665             handle = self._filehandles[fh]
666         else:
667             raise llfuse.FUSEError(errno.EBADF)
668
669         if not handle.obj.writable():
670             raise llfuse.FUSEError(errno.EPERM)
671
672         self.inodes.touch(handle.obj)
673
674         w = handle.obj.writeto(off, buf, self.num_retries)
675         if w:
676             self.write_counter.add(w)
677         return w
678
679     @release_time.time()
680     @catch_exceptions
681     def release(self, fh):
682         if fh in self._filehandles:
683             _logger.debug("arv-mount release fh %i", fh)
684             try:
685                 self._filehandles[fh].flush()
686             except Exception:
687                 raise
688             finally:
689                 self._filehandles[fh].release()
690                 del self._filehandles[fh]
691         self.inodes.inode_cache.cap_cache()
692
693     def releasedir(self, fh):
694         self.release(fh)
695
696     @opendir_time.time()
697     @catch_exceptions
698     def opendir(self, inode, ctx=None):
699         _logger.debug("arv-mount opendir: inode %i", inode)
700
701         if inode in self.inodes:
702             p = self.inodes[inode]
703         else:
704             raise llfuse.FUSEError(errno.ENOENT)
705
706         if not isinstance(p, Directory):
707             raise llfuse.FUSEError(errno.ENOTDIR)
708
709         fh = next(self._filehandles_counter)
710         if p.parent_inode in self.inodes:
711             parent = self.inodes[p.parent_inode]
712         else:
713             raise llfuse.FUSEError(errno.EIO)
714
715         # update atime
716         self.inodes.touch(p)
717
718         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
719         return fh
720
721     @readdir_time.time()
722     @catch_exceptions
723     def readdir(self, fh, off):
724         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
725
726         if fh in self._filehandles:
727             handle = self._filehandles[fh]
728         else:
729             raise llfuse.FUSEError(errno.EBADF)
730
731         e = off
732         while e < len(handle.entries):
733             if handle.entries[e][1].inode in self.inodes:
734                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
735             e += 1
736
737     @statfs_time.time()
738     @catch_exceptions
739     def statfs(self, ctx=None):
740         st = llfuse.StatvfsData()
741         st.f_bsize = 128 * 1024
742         st.f_blocks = 0
743         st.f_files = 0
744
745         st.f_bfree = 0
746         st.f_bavail = 0
747
748         st.f_ffree = 0
749         st.f_favail = 0
750
751         st.f_frsize = 0
752         return st
753
754     def _check_writable(self, inode_parent):
755         if not self.enable_write:
756             raise llfuse.FUSEError(errno.EROFS)
757
758         if inode_parent in self.inodes:
759             p = self.inodes[inode_parent]
760         else:
761             raise llfuse.FUSEError(errno.ENOENT)
762
763         if not isinstance(p, Directory):
764             raise llfuse.FUSEError(errno.ENOTDIR)
765
766         if not p.writable():
767             raise llfuse.FUSEError(errno.EPERM)
768
769         return p
770
771     @create_time.time()
772     @catch_exceptions
773     def create(self, inode_parent, name, mode, flags, ctx=None):
774         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
775
776         p = self._check_writable(inode_parent)
777         p.create(name)
778
779         # The file entry should have been implicitly created by callback.
780         f = p[name]
781         fh = next(self._filehandles_counter)
782         self._filehandles[fh] = FileHandle(fh, f)
783         self.inodes.touch(p)
784
785         f.inc_ref()
786         return (fh, self.getattr(f.inode))
787
788     @mkdir_time.time()
789     @catch_exceptions
790     def mkdir(self, inode_parent, name, mode, ctx=None):
791         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
792
793         p = self._check_writable(inode_parent)
794         p.mkdir(name)
795
796         # The dir entry should have been implicitly created by callback.
797         d = p[name]
798
799         d.inc_ref()
800         return self.getattr(d.inode)
801
802     @unlink_time.time()
803     @catch_exceptions
804     def unlink(self, inode_parent, name, ctx=None):
805         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
806         p = self._check_writable(inode_parent)
807         p.unlink(name)
808
809     @rmdir_time.time()
810     @catch_exceptions
811     def rmdir(self, inode_parent, name, ctx=None):
812         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
813         p = self._check_writable(inode_parent)
814         p.rmdir(name)
815
816     @rename_time.time()
817     @catch_exceptions
818     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
819         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
820         src = self._check_writable(inode_parent_old)
821         dest = self._check_writable(inode_parent_new)
822         dest.rename(name_old, name_new, src)
823
824     @flush_time.time()
825     @catch_exceptions
826     def flush(self, fh):
827         if fh in self._filehandles:
828             self._filehandles[fh].flush()
829
830     def fsync(self, fh, datasync):
831         self.flush(fh)
832
833     def fsyncdir(self, fh, datasync):
834         self.flush(fh)