21541: Fix typo
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 """FUSE driver for Arvados Keep
6
7 Architecture:
8
9 There is one `Operations` object per mount point.  It is the entry point for all
10 read and write requests from the llfuse module.
11
12 The operations object owns an `Inodes` object.  The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
15
16 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
17 memory footprint of file system objects and when they are last used.  When the
18 cache limit is exceeded, the least recently used objects are cleared.
19
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
21
22 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
24
25 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`.  These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
30 returing a result.
31
32 The general FUSE operation flow is as follows:
33
34 - The request handler is called with either an inode or file handle that is the
35   subject of the operation.
36
37 - Look up the inode using the Inodes table or the file handle in the
38   filehandles table to get the file system object.
39
40 - For methods that alter files or directories, check that the operation is
41   valid and permitted using _check_writable().
42
43 - Call the relevant method on the file system object.
44
45 - Return the result.
46
47 The FUSE driver supports the Arvados event bus.  When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
49
50 Implementation note: in the code, the terms 'object', 'entry' and
51 'inode' are used somewhat interchangeably, but generally mean an
52 arvados_fuse.File or arvados_fuse.Directory object which has numeric
53 inode assigned to it and appears in the Inodes._entries dictionary.
54
55 """
56
57 from __future__ import absolute_import
58 from __future__ import division
59 from builtins import next
60 from builtins import str
61 from builtins import object
62 import os
63 import llfuse
64 import errno
65 import stat
66 import threading
67 import arvados
68 import arvados.events
69 import logging
70 import time
71 import threading
72 import itertools
73 import collections
74 import functools
75 import arvados.keep
76 from prometheus_client import Summary
77 import queue
78 from dataclasses import dataclass
79 import typing
80 import gc
81
82 from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
83 from .fusefile import File, StringFile, FuseArvadosFile
84
85 _logger = logging.getLogger('arvados.arvados_fuse')
86
87 # Uncomment this to enable llfuse debug logging.
88 # log_handler = logging.StreamHandler()
89 # llogger = logging.getLogger('llfuse')
90 # llogger.addHandler(log_handler)
91 # llogger.setLevel(logging.DEBUG)
92
93 class Handle(object):
94     """Connects a numeric file handle to a File or Directory object that has
95     been opened by the client."""
96
97     def __init__(self, fh, obj):
98         self.fh = fh
99         self.obj = obj
100         self.obj.inc_use()
101
102     def release(self):
103         self.obj.dec_use()
104
105     def flush(self):
106         pass
107
108
109 class FileHandle(Handle):
110     """Connects a numeric file handle to a File  object that has
111     been opened by the client."""
112
113     def flush(self):
114         if self.obj.writable():
115             return self.obj.flush()
116
117
118 class DirectoryHandle(Handle):
119     """Connects a numeric file handle to a Directory object that has
120     been opened by the client.
121
122     DirectoryHandle is used by opendir() and readdir() to get
123     directory listings.  Entries returned by readdir() don't increment
124     the lookup count (kernel references), so increment our internal
125     "use count" to avoid having an item being removed mid-read.
126
127     """
128
129     def __init__(self, fh, dirobj, entries):
130         super(DirectoryHandle, self).__init__(fh, dirobj)
131         self.entries = entries
132
133         for ent in self.entries:
134             ent[1].inc_use()
135
136     def release(self):
137         for ent in self.entries:
138             ent[1].dec_use()
139         super(DirectoryHandle, self).release()
140
141
142 class InodeCache(object):
143     """Records the memory footprint of objects and when they are last used.
144
145     When the cache limit is exceeded, the least recently used objects
146     are cleared.  Clearing the object means discarding its contents to
147     release memory.  The next time the object is accessed, it must be
148     re-fetched from the server.  Note that the inode cache limit is a
149     soft limit; the cache limit may be exceeded if necessary to load
150     very large projects or collections, it may also be exceeded if an
151     inode can't be safely discarded based on kernel lookups
152     (has_ref()) or internal use count (in_use()).
153
154     """
155
156     def __init__(self, cap, min_entries=4):
157         self._cache_entries = collections.OrderedDict()
158         self.cap = cap
159         self._total = 0
160         self.min_entries = min_entries
161
162     def total(self):
163         return self._total
164
165     def evict_candidates(self):
166         """Yield entries that are candidates to be evicted
167         and stop when the cache total has shrunk sufficiently.
168
169         Implements a LRU cache, when an item is added or touch()ed it
170         goes to the back of the OrderedDict, so items in the front are
171         oldest.  The Inodes._remove() function determines if the entry
172         can actually be removed safely.
173
174         """
175
176         if self._total <= self.cap:
177             return
178
179         _logger.debug("InodeCache evict_candidates total %i cap %i entries %i", self._total, self.cap, len(self._cache_entries))
180
181         # Copy this into a deque for two reasons:
182         #
183         # 1. _cache_entries is modified by unmanage() which is called
184         # by _remove
185         #
186         # 2. popping off the front means the reference goes away
187         # immediately intead of sticking around for the lifetime of
188         # "values"
189         values = collections.deque(self._cache_entries.values())
190
191         while len(values) > 0:
192             if self._total < self.cap or len(self._cache_entries) < self.min_entries:
193                 break
194             yield values.popleft()
195
196     def unmanage(self, entry):
197         """Stop managing an object in the cache.
198
199         This happens when an object is being removed from the inode
200         entries table.
201
202         """
203
204         if entry.inode not in self._cache_entries:
205             return
206
207         # manage cache size running sum
208         self._total -= entry.cache_size
209         entry.cache_size = 0
210
211         # Now forget about it
212         del self._cache_entries[entry.inode]
213
214     def update_cache_size(self, obj):
215         """Update the cache total in response to the footprint of an
216         object changing (usually because it has been loaded or
217         cleared).
218
219         Adds or removes entries to the cache list based on the object
220         cache size.
221
222         """
223
224         if not obj.persisted():
225             return
226
227         if obj.inode in self._cache_entries:
228             self._total -= obj.cache_size
229
230         obj.cache_size = obj.objsize()
231
232         if obj.cache_size > 0 or obj.parent_inode is None:
233             self._total += obj.cache_size
234             self._cache_entries[obj.inode] = obj
235         elif obj.cache_size == 0 and obj.inode in self._cache_entries:
236             del self._cache_entries[obj.inode]
237
238     def touch(self, obj):
239         """Indicate an object was used recently, making it low
240         priority to be removed from the cache.
241
242         """
243         if obj.inode in self._cache_entries:
244             self._cache_entries.move_to_end(obj.inode)
245             return True
246         return False
247
248     def clear(self):
249         self._cache_entries.clear()
250         self._total = 0
251
252 @dataclass
253 class RemoveInode:
254     entry: typing.Union[Directory, File]
255     def inode_op(self, inodes, locked_ops):
256         if locked_ops is None:
257             inodes._remove(self.entry)
258             return True
259         else:
260             locked_ops.append(self)
261             return False
262
263 @dataclass
264 class InvalidateInode:
265     inode: int
266     def inode_op(self, inodes, locked_ops):
267         llfuse.invalidate_inode(self.inode)
268         return True
269
270 @dataclass
271 class InvalidateEntry:
272     inode: int
273     name: str
274     def inode_op(self, inodes, locked_ops):
275         llfuse.invalidate_entry(self.inode, self.name)
276         return True
277
278 @dataclass
279 class EvictCandidates:
280     def inode_op(self, inodes, locked_ops):
281         return True
282
283
284 class Inodes(object):
285     """Manage the set of inodes.
286
287     This is the mapping from a numeric id to a concrete File or
288     Directory object
289
290     """
291
292     def __init__(self, inode_cache, encoding="utf-8", fsns=None, shutdown_started=None):
293         self._entries = {}
294         self._counter = itertools.count(llfuse.ROOT_INODE)
295         self.inode_cache = inode_cache
296         self.encoding = encoding
297         self._fsns = fsns
298         self._shutdown_started = shutdown_started or threading.Event()
299
300         self._inode_remove_queue = queue.Queue()
301         self._inode_remove_thread = threading.Thread(None, self._inode_remove)
302         self._inode_remove_thread.daemon = True
303         self._inode_remove_thread.start()
304
305         self.cap_cache_event = threading.Event()
306         self._by_uuid = collections.defaultdict(list)
307
308     def __getitem__(self, item):
309         return self._entries[item]
310
311     def __setitem__(self, key, item):
312         self._entries[key] = item
313
314     def __iter__(self):
315         return iter(self._entries.keys())
316
317     def items(self):
318         return self._entries.items()
319
320     def __contains__(self, k):
321         return k in self._entries
322
323     def touch(self, entry):
324         """Update the access time, adjust the cache position, and
325         notify the _inode_remove thread to recheck the cache.
326
327         """
328
329         entry._atime = time.time()
330         if self.inode_cache.touch(entry):
331             self.cap_cache()
332
333     def cap_cache(self):
334         """Notify the _inode_remove thread to recheck the cache."""
335         if not self.cap_cache_event.is_set():
336             self.cap_cache_event.set()
337             self._inode_remove_queue.put(EvictCandidates())
338
339     def update_uuid(self, entry):
340         """Update the Arvados uuid associated with an inode entry.
341
342         This is used to look up inodes that need to be invalidated
343         when a websocket event indicates the object has changed on the
344         API server.
345
346         """
347         if entry.cache_uuid and entry in self._by_uuid[entry.cache_uuid]:
348             self._by_uuid[entry.cache_uuid].remove(entry)
349
350         entry.cache_uuid = entry.uuid()
351         if entry.cache_uuid and entry not in self._by_uuid[entry.cache_uuid]:
352             self._by_uuid[entry.cache_uuid].append(entry)
353
354         if not self._by_uuid[entry.cache_uuid]:
355             del self._by_uuid[entry.cache_uuid]
356
357     def add_entry(self, entry):
358         """Assign a numeric inode to a new entry."""
359
360         entry.inode = next(self._counter)
361         if entry.inode == llfuse.ROOT_INODE:
362             entry.inc_ref()
363         self._entries[entry.inode] = entry
364
365         self.update_uuid(entry)
366         self.inode_cache.update_cache_size(entry)
367         self.cap_cache()
368         return entry
369
370     def del_entry(self, entry):
371         """Remove entry from the inode table.
372
373         Indicate this inode entry is pending deletion by setting
374         parent_inode to None.  Notify the _inode_remove thread to try
375         and remove it.
376
377         """
378
379         entry.parent_inode = None
380         self._inode_remove_queue.put(RemoveInode(entry))
381         _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
382
383     def _inode_remove(self):
384         """Background thread to handle tasks related to invalidating
385         inodes in the kernel, and removing objects from the inodes
386         table entirely.
387
388         """
389
390         locked_ops = collections.deque()
391         while True:
392             blocking_get = True
393             while True:
394                 try:
395                     qentry = self._inode_remove_queue.get(blocking_get)
396                 except queue.Empty:
397                     break
398                 blocking_get = False
399                 if qentry is None:
400                     return
401
402                 if self._shutdown_started.is_set():
403                     continue
404
405                 # Process this entry
406                 if qentry.inode_op(self, locked_ops):
407                     self._inode_remove_queue.task_done()
408
409                 # Give up the reference
410                 qentry = None
411
412             with llfuse.lock:
413                 while len(locked_ops) > 0:
414                     if locked_ops.popleft().inode_op(self, None):
415                         self._inode_remove_queue.task_done()
416                 self.cap_cache_event.clear()
417                 for entry in self.inode_cache.evict_candidates():
418                     self._remove(entry)
419
420     def wait_remove_queue_empty(self):
421         # used by tests
422         self._inode_remove_queue.join()
423
424     def _remove(self, entry):
425         """Remove an inode entry if possible.
426
427         If the entry is still referenced or in use, don't do anything.
428         If this is not referenced but the parent is still referenced,
429         clear any data held by the object (which may include directory
430         entries under the object) but don't remove it from the inode
431         table.
432
433         """
434         try:
435             if entry.inode is None:
436                 # Removed already
437                 return
438
439             if entry.inode == llfuse.ROOT_INODE:
440                 return
441
442             if entry.in_use():
443                 # referenced internally, stay pinned
444                 #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
445                 return
446
447             # Tell the kernel it should forget about it
448             entry.kernel_invalidate()
449
450             if entry.has_ref():
451                 # has kernel reference, could still be accessed.
452                 # when the kernel forgets about it, we can delete it.
453                 #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode)
454                 return
455
456             # commit any pending changes
457             with llfuse.lock_released:
458                 entry.finalize()
459
460             # Clear the contents
461             entry.clear()
462
463             if entry.parent_inode is None:
464                 _logger.debug("InodeCache forgetting inode %i, object cache_size %i, cache total %i, forget_inode True, inode entries %i, type %s",
465                               entry.inode, entry.cache_size, self.inode_cache.total(),
466                               len(self._entries), type(entry))
467
468                 if entry.cache_uuid:
469                     self._by_uuid[entry.cache_uuid].remove(entry)
470                     if not self._by_uuid[entry.cache_uuid]:
471                         del self._by_uuid[entry.cache_uuid]
472                     entry.cache_uuid = None
473
474                 self.inode_cache.unmanage(entry)
475
476                 del self._entries[entry.inode]
477                 entry.inode = None
478
479         except Exception as e:
480             _logger.exception("failed remove")
481
482     def invalidate_inode(self, entry):
483         if entry.has_ref():
484             # Only necessary if the kernel has previously done a lookup on this
485             # inode and hasn't yet forgotten about it.
486             self._inode_remove_queue.put(InvalidateInode(entry.inode))
487
488     def invalidate_entry(self, entry, name):
489         if entry.has_ref():
490             # Only necessary if the kernel has previously done a lookup on this
491             # inode and hasn't yet forgotten about it.
492             self._inode_remove_queue.put(InvalidateEntry(entry.inode, name.encode(self.encoding)))
493
494     def begin_shutdown(self):
495         self._inode_remove_queue.put(None)
496         if self._inode_remove_thread is not None:
497             self._inode_remove_thread.join()
498         self._inode_remove_thread = None
499
500     def clear(self):
501         with llfuse.lock_released:
502             self.begin_shutdown()
503
504         self.inode_cache.clear()
505         self._by_uuid.clear()
506
507         for k,v in self._entries.items():
508             try:
509                 v.finalize()
510             except Exception as e:
511                 _logger.exception("Error during finalize of inode %i", k)
512
513         self._entries.clear()
514
515     def forward_slash_subst(self):
516         return self._fsns
517
518     def find_by_uuid(self, uuid):
519         """Return a list of zero or more inode entries corresponding
520         to this Arvados UUID."""
521         return self._by_uuid.get(uuid, [])
522
523
524 def catch_exceptions(orig_func):
525     """Catch uncaught exceptions and log them consistently."""
526
527     @functools.wraps(orig_func)
528     def catch_exceptions_wrapper(self, *args, **kwargs):
529         try:
530             return orig_func(self, *args, **kwargs)
531         except llfuse.FUSEError:
532             raise
533         except EnvironmentError as e:
534             raise llfuse.FUSEError(e.errno)
535         except NotImplementedError:
536             raise llfuse.FUSEError(errno.ENOTSUP)
537         except arvados.errors.KeepWriteError as e:
538             _logger.error("Keep write error: " + str(e))
539             raise llfuse.FUSEError(errno.EIO)
540         except arvados.errors.NotFoundError as e:
541             _logger.error("Block not found error: " + str(e))
542             raise llfuse.FUSEError(errno.EIO)
543         except:
544             _logger.exception("Unhandled exception during FUSE operation")
545             raise llfuse.FUSEError(errno.EIO)
546
547     return catch_exceptions_wrapper
548
549
550 class Operations(llfuse.Operations):
551     """This is the main interface with llfuse.
552
553     The methods on this object are called by llfuse threads to service FUSE
554     events to query and read from the file system.
555
556     llfuse has its own global lock which is acquired before calling a request handler,
557     so request handlers do not run concurrently unless the lock is explicitly released
558     using 'with llfuse.lock_released:'
559
560     """
561
562     fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
563     read_time = fuse_time.labels(op='read')
564     write_time = fuse_time.labels(op='write')
565     destroy_time = fuse_time.labels(op='destroy')
566     on_event_time = fuse_time.labels(op='on_event')
567     getattr_time = fuse_time.labels(op='getattr')
568     setattr_time = fuse_time.labels(op='setattr')
569     lookup_time = fuse_time.labels(op='lookup')
570     forget_time = fuse_time.labels(op='forget')
571     open_time = fuse_time.labels(op='open')
572     release_time = fuse_time.labels(op='release')
573     opendir_time = fuse_time.labels(op='opendir')
574     readdir_time = fuse_time.labels(op='readdir')
575     statfs_time = fuse_time.labels(op='statfs')
576     create_time = fuse_time.labels(op='create')
577     mkdir_time = fuse_time.labels(op='mkdir')
578     unlink_time = fuse_time.labels(op='unlink')
579     rmdir_time = fuse_time.labels(op='rmdir')
580     rename_time = fuse_time.labels(op='rename')
581     flush_time = fuse_time.labels(op='flush')
582
583     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False, fsns=None):
584         super(Operations, self).__init__()
585
586         self._api_client = api_client
587
588         if not inode_cache:
589             inode_cache = InodeCache(cap=256*1024*1024)
590
591         if fsns is None:
592             try:
593                 fsns = self._api_client.config()["Collections"]["ForwardSlashNameSubstitution"]
594             except KeyError:
595                 # old API server with no FSNS config
596                 fsns = '_'
597             else:
598                 if fsns == '' or fsns == '/':
599                     fsns = None
600
601         # If we get overlapping shutdown events (e.g., fusermount -u
602         # -z and operations.destroy()) llfuse calls forget() on inodes
603         # that have already been deleted. To avoid this, we make
604         # forget() a no-op if called after destroy().
605         self._shutdown_started = threading.Event()
606
607         self.inodes = Inodes(inode_cache, encoding=encoding, fsns=fsns,
608                              shutdown_started=self._shutdown_started)
609         self.uid = uid
610         self.gid = gid
611         self.enable_write = enable_write
612
613         # dict of inode to filehandle
614         self._filehandles = {}
615         self._filehandles_counter = itertools.count(0)
616
617         # Other threads that need to wait until the fuse driver
618         # is fully initialized should wait() on this event object.
619         self.initlock = threading.Event()
620
621         self.num_retries = num_retries
622
623         self.read_counter = arvados.keep.Counter()
624         self.write_counter = arvados.keep.Counter()
625         self.read_ops_counter = arvados.keep.Counter()
626         self.write_ops_counter = arvados.keep.Counter()
627
628         self.events = None
629
630         # We rely on the cyclic garbage collector to deallocate
631         # Collection objects from the Python SDK.  A lower GC
632         # threshold encourages Python to be more aggressive in
633         # reclaiming these and seems to slow down the growth in memory
634         # usage over time.
635         gc.set_threshold(200)
636
637     def init(self):
638         # Allow threads that are waiting for the driver to be finished
639         # initializing to continue
640         self.initlock.set()
641
642     def metric_samples(self):
643         return self.fuse_time.collect()[0].samples
644
645     def metric_op_names(self):
646         ops = []
647         for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
648             if cur_op not in ops:
649                 ops.append(cur_op)
650         return ops
651
652     def metric_value(self, opname, metric):
653         op_value = [sample.value for sample in self.metric_samples()
654                     if sample.name == metric and sample.labels['op'] == opname]
655         return op_value[0] if len(op_value) == 1 else None
656
657     def metric_sum_func(self, opname):
658         return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
659
660     def metric_count_func(self, opname):
661         return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
662
663     def begin_shutdown(self):
664         self._shutdown_started.set()
665         self.inodes.begin_shutdown()
666
667     @destroy_time.time()
668     @catch_exceptions
669     def destroy(self):
670         _logger.debug("arv-mount destroy: start")
671
672         with llfuse.lock_released:
673             self.begin_shutdown()
674
675         if self.events:
676             self.events.close()
677             self.events = None
678
679         self.inodes.clear()
680
681         _logger.debug("arv-mount destroy: complete")
682
683
684     def access(self, inode, mode, ctx):
685         return True
686
687     def listen_for_events(self):
688         self.events = arvados.events.subscribe(
689             self._api_client,
690             [["event_type", "in", ["create", "update", "delete"]]],
691             self.on_event)
692
693     @on_event_time.time()
694     @catch_exceptions
695     def on_event(self, ev):
696         if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
697             return
698         with llfuse.lock:
699             properties = ev.get("properties") or {}
700             old_attrs = properties.get("old_attributes") or {}
701             new_attrs = properties.get("new_attributes") or {}
702
703             for item in self.inodes.find_by_uuid(ev["object_uuid"]):
704                 item.invalidate()
705
706             oldowner = old_attrs.get("owner_uuid")
707             newowner = ev.get("object_owner_uuid")
708             for parent in (
709                     self.inodes.find_by_uuid(oldowner) +
710                     self.inodes.find_by_uuid(newowner)):
711                 parent.invalidate()
712
713     @getattr_time.time()
714     @catch_exceptions
715     def getattr(self, inode, ctx=None):
716         if inode not in self.inodes:
717             _logger.debug("arv-mount getattr: inode %i missing", inode)
718             raise llfuse.FUSEError(errno.ENOENT)
719
720         e = self.inodes[inode]
721         self.inodes.touch(e)
722         parent = None
723         if e.parent_inode:
724             parent = self.inodes[e.parent_inode]
725             self.inodes.touch(parent)
726
727         entry = llfuse.EntryAttributes()
728         entry.st_ino = inode
729         entry.generation = 0
730         entry.entry_timeout = parent.time_to_next_poll() if parent is not None else 0
731         entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
732
733         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
734         if isinstance(e, Directory):
735             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
736         else:
737             entry.st_mode |= stat.S_IFREG
738             if isinstance(e, FuseArvadosFile):
739                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
740
741         if self.enable_write and e.writable():
742             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
743
744         entry.st_nlink = 1
745         entry.st_uid = self.uid
746         entry.st_gid = self.gid
747         entry.st_rdev = 0
748
749         entry.st_size = e.size()
750
751         entry.st_blksize = 512
752         entry.st_blocks = (entry.st_size // 512) + 1
753         if hasattr(entry, 'st_atime_ns'):
754             # llfuse >= 0.42
755             entry.st_atime_ns = int(e.atime() * 1000000000)
756             entry.st_mtime_ns = int(e.mtime() * 1000000000)
757             entry.st_ctime_ns = int(e.mtime() * 1000000000)
758         else:
759             # llfuse < 0.42
760             entry.st_atime = int(e.atime)
761             entry.st_mtime = int(e.mtime)
762             entry.st_ctime = int(e.mtime)
763
764         return entry
765
766     @setattr_time.time()
767     @catch_exceptions
768     def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
769         entry = self.getattr(inode)
770
771         if fh is not None and fh in self._filehandles:
772             handle = self._filehandles[fh]
773             e = handle.obj
774         else:
775             e = self.inodes[inode]
776
777         if fields is None:
778             # llfuse < 0.42
779             update_size = attr.st_size is not None
780         else:
781             # llfuse >= 0.42
782             update_size = fields.update_size
783         if update_size and isinstance(e, FuseArvadosFile):
784             with llfuse.lock_released:
785                 e.arvfile.truncate(attr.st_size)
786                 entry.st_size = e.arvfile.size()
787
788         return entry
789
790     @lookup_time.time()
791     @catch_exceptions
792     def lookup(self, parent_inode, name, ctx=None):
793         name = str(name, self.inodes.encoding)
794         inode = None
795
796         if name == '.':
797             inode = parent_inode
798         elif parent_inode in self.inodes:
799             p = self.inodes[parent_inode]
800             self.inodes.touch(p)
801             if name == '..':
802                 inode = p.parent_inode
803             elif isinstance(p, Directory) and name in p:
804                 if p[name].inode is None:
805                     _logger.debug("arv-mount lookup: parent_inode %i name '%s' found but inode was None",
806                                   parent_inode, name)
807                     raise llfuse.FUSEError(errno.ENOENT)
808
809                 inode = p[name].inode
810
811         if inode != None:
812             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
813                       parent_inode, name, inode)
814             self.inodes.touch(self.inodes[inode])
815             self.inodes[inode].inc_ref()
816             return self.getattr(inode)
817         else:
818             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
819                       parent_inode, name)
820             raise llfuse.FUSEError(errno.ENOENT)
821
822     @forget_time.time()
823     @catch_exceptions
824     def forget(self, inodes):
825         if self._shutdown_started.is_set():
826             return
827         for inode, nlookup in inodes:
828             ent = self.inodes[inode]
829             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
830             if ent.dec_ref(nlookup) == 0 and ent.parent_inode is None:
831                 self.inodes.del_entry(ent)
832
833     @open_time.time()
834     @catch_exceptions
835     def open(self, inode, flags, ctx=None):
836         if inode in self.inodes:
837             p = self.inodes[inode]
838         else:
839             _logger.debug("arv-mount open: inode %i missing", inode)
840             raise llfuse.FUSEError(errno.ENOENT)
841
842         if isinstance(p, Directory):
843             raise llfuse.FUSEError(errno.EISDIR)
844
845         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
846             raise llfuse.FUSEError(errno.EPERM)
847
848         fh = next(self._filehandles_counter)
849         self._filehandles[fh] = FileHandle(fh, p)
850         self.inodes.touch(p)
851
852         # Normally, we will have received an "update" event if the
853         # parent collection is stale here. However, even if the parent
854         # collection hasn't changed, the manifest might have been
855         # fetched so long ago that the signatures on the data block
856         # locators have expired. Calling checkupdate() on all
857         # ancestors ensures the signatures will be refreshed if
858         # necessary.
859         while p.parent_inode in self.inodes:
860             if p == self.inodes[p.parent_inode]:
861                 break
862             p = self.inodes[p.parent_inode]
863             self.inodes.touch(p)
864             p.checkupdate()
865
866         _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
867
868         return fh
869
870     @read_time.time()
871     @catch_exceptions
872     def read(self, fh, off, size):
873         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
874         self.read_ops_counter.add(1)
875
876         if fh in self._filehandles:
877             handle = self._filehandles[fh]
878         else:
879             raise llfuse.FUSEError(errno.EBADF)
880
881         self.inodes.touch(handle.obj)
882
883         r = handle.obj.readfrom(off, size, self.num_retries)
884         if r:
885             self.read_counter.add(len(r))
886         return r
887
888     @write_time.time()
889     @catch_exceptions
890     def write(self, fh, off, buf):
891         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
892         self.write_ops_counter.add(1)
893
894         if fh in self._filehandles:
895             handle = self._filehandles[fh]
896         else:
897             raise llfuse.FUSEError(errno.EBADF)
898
899         if not handle.obj.writable():
900             raise llfuse.FUSEError(errno.EPERM)
901
902         self.inodes.touch(handle.obj)
903
904         w = handle.obj.writeto(off, buf, self.num_retries)
905         if w:
906             self.write_counter.add(w)
907         return w
908
909     @release_time.time()
910     @catch_exceptions
911     def release(self, fh):
912         if fh in self._filehandles:
913             _logger.debug("arv-mount release fh %i", fh)
914             try:
915                 self._filehandles[fh].flush()
916             except Exception:
917                 raise
918             finally:
919                 self._filehandles[fh].release()
920                 del self._filehandles[fh]
921         self.inodes.cap_cache()
922
923     def releasedir(self, fh):
924         self.release(fh)
925
926     @opendir_time.time()
927     @catch_exceptions
928     def opendir(self, inode, ctx=None):
929         _logger.debug("arv-mount opendir: inode %i", inode)
930
931         if inode in self.inodes:
932             p = self.inodes[inode]
933         else:
934             _logger.debug("arv-mount opendir: called with unknown or removed inode %i", inode)
935             raise llfuse.FUSEError(errno.ENOENT)
936
937         if not isinstance(p, Directory):
938             raise llfuse.FUSEError(errno.ENOTDIR)
939
940         fh = next(self._filehandles_counter)
941         if p.parent_inode in self.inodes:
942             parent = self.inodes[p.parent_inode]
943         else:
944             _logger.warning("arv-mount opendir: parent inode %i of %i is missing", p.parent_inode, inode)
945             raise llfuse.FUSEError(errno.EIO)
946
947         _logger.debug("arv-mount opendir: inode %i fh %i ", inode, fh)
948
949         # update atime
950         p.inc_use()
951         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + p.items())
952         p.dec_use()
953         self.inodes.touch(p)
954         return fh
955
956     @readdir_time.time()
957     @catch_exceptions
958     def readdir(self, fh, off):
959         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
960
961         if fh in self._filehandles:
962             handle = self._filehandles[fh]
963         else:
964             raise llfuse.FUSEError(errno.EBADF)
965
966         e = off
967         while e < len(handle.entries):
968             ent = handle.entries[e]
969             if ent[1].inode in self.inodes:
970                 yield (ent[0].encode(self.inodes.encoding), self.getattr(ent[1].inode), e+1)
971             e += 1
972
973     @statfs_time.time()
974     @catch_exceptions
975     def statfs(self, ctx=None):
976         st = llfuse.StatvfsData()
977         st.f_bsize = 128 * 1024
978         st.f_blocks = 0
979         st.f_files = 0
980
981         st.f_bfree = 0
982         st.f_bavail = 0
983
984         st.f_ffree = 0
985         st.f_favail = 0
986
987         st.f_frsize = 0
988         return st
989
990     def _check_writable(self, inode_parent):
991         if not self.enable_write:
992             raise llfuse.FUSEError(errno.EROFS)
993
994         if inode_parent in self.inodes:
995             p = self.inodes[inode_parent]
996         else:
997             raise llfuse.FUSEError(errno.ENOENT)
998
999         if not isinstance(p, Directory):
1000             raise llfuse.FUSEError(errno.ENOTDIR)
1001
1002         if not p.writable():
1003             raise llfuse.FUSEError(errno.EPERM)
1004
1005         return p
1006
1007     @create_time.time()
1008     @catch_exceptions
1009     def create(self, inode_parent, name, mode, flags, ctx=None):
1010         name = name.decode(encoding=self.inodes.encoding)
1011         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
1012
1013         p = self._check_writable(inode_parent)
1014         p.create(name)
1015
1016         # The file entry should have been implicitly created by callback.
1017         f = p[name]
1018         fh = next(self._filehandles_counter)
1019         self._filehandles[fh] = FileHandle(fh, f)
1020         self.inodes.touch(p)
1021
1022         f.inc_ref()
1023         return (fh, self.getattr(f.inode))
1024
1025     @mkdir_time.time()
1026     @catch_exceptions
1027     def mkdir(self, inode_parent, name, mode, ctx=None):
1028         name = name.decode(encoding=self.inodes.encoding)
1029         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
1030
1031         p = self._check_writable(inode_parent)
1032         p.mkdir(name)
1033
1034         # The dir entry should have been implicitly created by callback.
1035         d = p[name]
1036
1037         d.inc_ref()
1038         return self.getattr(d.inode)
1039
1040     @unlink_time.time()
1041     @catch_exceptions
1042     def unlink(self, inode_parent, name, ctx=None):
1043         name = name.decode(encoding=self.inodes.encoding)
1044         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
1045         p = self._check_writable(inode_parent)
1046         p.unlink(name)
1047
1048     @rmdir_time.time()
1049     @catch_exceptions
1050     def rmdir(self, inode_parent, name, ctx=None):
1051         name = name.decode(encoding=self.inodes.encoding)
1052         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
1053         p = self._check_writable(inode_parent)
1054         p.rmdir(name)
1055
1056     @rename_time.time()
1057     @catch_exceptions
1058     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
1059         name_old = name_old.decode(encoding=self.inodes.encoding)
1060         name_new = name_new.decode(encoding=self.inodes.encoding)
1061         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
1062         src = self._check_writable(inode_parent_old)
1063         dest = self._check_writable(inode_parent_new)
1064         dest.rename(name_old, name_new, src)
1065
1066     @flush_time.time()
1067     @catch_exceptions
1068     def flush(self, fh):
1069         if fh in self._filehandles:
1070             self._filehandles[fh].flush()
1071
1072     def fsync(self, fh, datasync):
1073         self.flush(fh)
1074
1075     def fsyncdir(self, fh, datasync):
1076         self.flush(fh)