Merge branch '21535-multi-wf-delete'
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 """FUSE driver for Arvados Keep
6
7 Architecture:
8
9 There is one `Operations` object per mount point.  It is the entry point for all
10 read and write requests from the llfuse module.
11
12 The operations object owns an `Inodes` object.  The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
15
16 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
17 memory footprint of file system objects and when they are last used.  When the
18 cache limit is exceeded, the least recently used objects are cleared.
19
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
21
22 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
24
25 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`.  These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
30 returing a result.
31
32 The general FUSE operation flow is as follows:
33
34 - The request handler is called with either an inode or file handle that is the
35   subject of the operation.
36
37 - Look up the inode using the Inodes table or the file handle in the
38   filehandles table to get the file system object.
39
40 - For methods that alter files or directories, check that the operation is
41   valid and permitted using _check_writable().
42
43 - Call the relevant method on the file system object.
44
45 - Return the result.
46
47 The FUSE driver supports the Arvados event bus.  When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
49
50 Implementation note: in the code, the terms 'object', 'entry' and
51 'inode' are used somewhat interchangeably, but generally mean an
52 arvados_fuse.File or arvados_fuse.Directory object which has numeric
53 inode assigned to it and appears in the Inodes._entries dictionary.
54
55 """
56
57 import os
58 import llfuse
59 import errno
60 import stat
61 import threading
62 import arvados
63 import arvados.events
64 import logging
65 import time
66 import threading
67 import itertools
68 import collections
69 import functools
70 import arvados.keep
71 from prometheus_client import Summary
72 import queue
73 from dataclasses import dataclass
74 import typing
75
76 from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
77 from .fusefile import File, StringFile, FuseArvadosFile
78
79 _logger = logging.getLogger('arvados.arvados_fuse')
80
81 # Uncomment this to enable llfuse debug logging.
82 # log_handler = logging.StreamHandler()
83 # llogger = logging.getLogger('llfuse')
84 # llogger.addHandler(log_handler)
85 # llogger.setLevel(logging.DEBUG)
86
87 class Handle(object):
88     """Connects a numeric file handle to a File or Directory object that has
89     been opened by the client."""
90
91     def __init__(self, fh, obj):
92         self.fh = fh
93         self.obj = obj
94         self.obj.inc_use()
95
96     def release(self):
97         self.obj.dec_use()
98
99     def flush(self):
100         pass
101
102
103 class FileHandle(Handle):
104     """Connects a numeric file handle to a File  object that has
105     been opened by the client."""
106
107     def flush(self):
108         if self.obj.writable():
109             return self.obj.flush()
110
111
112 class DirectoryHandle(Handle):
113     """Connects a numeric file handle to a Directory object that has
114     been opened by the client.
115
116     DirectoryHandle is used by opendir() and readdir() to get
117     directory listings.  Entries returned by readdir() don't increment
118     the lookup count (kernel references), so increment our internal
119     "use count" to avoid having an item being removed mid-read.
120
121     """
122
123     def __init__(self, fh, dirobj, entries):
124         super(DirectoryHandle, self).__init__(fh, dirobj)
125         self.entries = entries
126
127         for ent in self.entries:
128             ent[1].inc_use()
129
130     def release(self):
131         for ent in self.entries:
132             ent[1].dec_use()
133         super(DirectoryHandle, self).release()
134
135
136 class InodeCache(object):
137     """Records the memory footprint of objects and when they are last used.
138
139     When the cache limit is exceeded, the least recently used objects
140     are cleared.  Clearing the object means discarding its contents to
141     release memory.  The next time the object is accessed, it must be
142     re-fetched from the server.  Note that the inode cache limit is a
143     soft limit; the cache limit may be exceeded if necessary to load
144     very large projects or collections, it may also be exceeded if an
145     inode can't be safely discarded based on kernel lookups
146     (has_ref()) or internal use count (in_use()).
147
148     """
149
150     def __init__(self, cap, min_entries=4):
151         # Standard dictionaries are ordered, but OrderedDict is still better here, see
152         # https://docs.python.org/3.11/library/collections.html#ordereddict-objects
153         # specifically we use move_to_end() which standard dicts don't have.
154         self._cache_entries = collections.OrderedDict()
155         self.cap = cap
156         self._total = 0
157         self.min_entries = min_entries
158
159     def total(self):
160         return self._total
161
162     def evict_candidates(self):
163         """Yield entries that are candidates to be evicted
164         and stop when the cache total has shrunk sufficiently.
165
166         Implements a LRU cache, when an item is added or touch()ed it
167         goes to the back of the OrderedDict, so items in the front are
168         oldest.  The Inodes._remove() function determines if the entry
169         can actually be removed safely.
170
171         """
172
173         if self._total <= self.cap:
174             return
175
176         _logger.debug("InodeCache evict_candidates total %i cap %i entries %i", self._total, self.cap, len(self._cache_entries))
177
178         # Copy this into a deque for two reasons:
179         #
180         # 1. _cache_entries is modified by unmanage() which is called
181         # by _remove
182         #
183         # 2. popping off the front means the reference goes away
184         # immediately intead of sticking around for the lifetime of
185         # "values"
186         values = collections.deque(self._cache_entries.values())
187
188         while values:
189             if self._total < self.cap or len(self._cache_entries) < self.min_entries:
190                 break
191             yield values.popleft()
192
193     def unmanage(self, entry):
194         """Stop managing an object in the cache.
195
196         This happens when an object is being removed from the inode
197         entries table.
198
199         """
200
201         if entry.inode not in self._cache_entries:
202             return
203
204         # manage cache size running sum
205         self._total -= entry.cache_size
206         entry.cache_size = 0
207
208         # Now forget about it
209         del self._cache_entries[entry.inode]
210
211     def update_cache_size(self, obj):
212         """Update the cache total in response to the footprint of an
213         object changing (usually because it has been loaded or
214         cleared).
215
216         Adds or removes entries to the cache list based on the object
217         cache size.
218
219         """
220
221         if not obj.persisted():
222             return
223
224         if obj.inode in self._cache_entries:
225             self._total -= obj.cache_size
226
227         obj.cache_size = obj.objsize()
228
229         if obj.cache_size > 0 or obj.parent_inode is None:
230             self._total += obj.cache_size
231             self._cache_entries[obj.inode] = obj
232         elif obj.cache_size == 0 and obj.inode in self._cache_entries:
233             del self._cache_entries[obj.inode]
234
235     def touch(self, obj):
236         """Indicate an object was used recently, making it low
237         priority to be removed from the cache.
238
239         """
240         if obj.inode in self._cache_entries:
241             self._cache_entries.move_to_end(obj.inode)
242             return True
243         return False
244
245     def clear(self):
246         self._cache_entries.clear()
247         self._total = 0
248
249 @dataclass
250 class RemoveInode:
251     entry: typing.Union[Directory, File]
252     def inode_op(self, inodes, locked_ops):
253         if locked_ops is None:
254             inodes._remove(self.entry)
255             return True
256         else:
257             locked_ops.append(self)
258             return False
259
260 @dataclass
261 class InvalidateInode:
262     inode: int
263     def inode_op(self, inodes, locked_ops):
264         llfuse.invalidate_inode(self.inode)
265         return True
266
267 @dataclass
268 class InvalidateEntry:
269     inode: int
270     name: str
271     def inode_op(self, inodes, locked_ops):
272         llfuse.invalidate_entry(self.inode, self.name)
273         return True
274
275 @dataclass
276 class EvictCandidates:
277     def inode_op(self, inodes, locked_ops):
278         return True
279
280
281 class Inodes(object):
282     """Manage the set of inodes.
283
284     This is the mapping from a numeric id to a concrete File or
285     Directory object
286
287     """
288
289     def __init__(self, inode_cache, encoding="utf-8", fsns=None, shutdown_started=None):
290         self._entries = {}
291         self._counter = itertools.count(llfuse.ROOT_INODE)
292         self.inode_cache = inode_cache
293         self.encoding = encoding
294         self._fsns = fsns
295         self._shutdown_started = shutdown_started or threading.Event()
296
297         self._inode_remove_queue = queue.Queue()
298         self._inode_remove_thread = threading.Thread(None, self._inode_remove)
299         self._inode_remove_thread.daemon = True
300         self._inode_remove_thread.start()
301
302         self.cap_cache_event = threading.Event()
303         self._by_uuid = collections.defaultdict(list)
304
305     def __getitem__(self, item):
306         return self._entries[item]
307
308     def __setitem__(self, key, item):
309         self._entries[key] = item
310
311     def __iter__(self):
312         return iter(self._entries.keys())
313
314     def items(self):
315         return self._entries.items()
316
317     def __contains__(self, k):
318         return k in self._entries
319
320     def touch(self, entry):
321         """Update the access time, adjust the cache position, and
322         notify the _inode_remove thread to recheck the cache.
323
324         """
325
326         entry._atime = time.time()
327         if self.inode_cache.touch(entry):
328             self.cap_cache()
329
330     def cap_cache(self):
331         """Notify the _inode_remove thread to recheck the cache."""
332         if not self.cap_cache_event.is_set():
333             self.cap_cache_event.set()
334             self._inode_remove_queue.put(EvictCandidates())
335
336     def update_uuid(self, entry):
337         """Update the Arvados uuid associated with an inode entry.
338
339         This is used to look up inodes that need to be invalidated
340         when a websocket event indicates the object has changed on the
341         API server.
342
343         """
344         if entry.cache_uuid and entry in self._by_uuid[entry.cache_uuid]:
345             self._by_uuid[entry.cache_uuid].remove(entry)
346
347         entry.cache_uuid = entry.uuid()
348         if entry.cache_uuid and entry not in self._by_uuid[entry.cache_uuid]:
349             self._by_uuid[entry.cache_uuid].append(entry)
350
351         if not self._by_uuid[entry.cache_uuid]:
352             del self._by_uuid[entry.cache_uuid]
353
354     def add_entry(self, entry):
355         """Assign a numeric inode to a new entry."""
356
357         entry.inode = next(self._counter)
358         if entry.inode == llfuse.ROOT_INODE:
359             entry.inc_ref()
360         self._entries[entry.inode] = entry
361
362         self.update_uuid(entry)
363         self.inode_cache.update_cache_size(entry)
364         self.cap_cache()
365         return entry
366
367     def del_entry(self, entry):
368         """Remove entry from the inode table.
369
370         Indicate this inode entry is pending deletion by setting
371         parent_inode to None.  Notify the _inode_remove thread to try
372         and remove it.
373
374         """
375
376         entry.parent_inode = None
377         self._inode_remove_queue.put(RemoveInode(entry))
378         _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
379
380     def _inode_remove(self):
381         """Background thread to handle tasks related to invalidating
382         inodes in the kernel, and removing objects from the inodes
383         table entirely.
384
385         """
386
387         locked_ops = collections.deque()
388         while True:
389             blocking_get = True
390             while True:
391                 try:
392                     qentry = self._inode_remove_queue.get(blocking_get)
393                 except queue.Empty:
394                     break
395                 blocking_get = False
396                 if qentry is None:
397                     return
398
399                 if self._shutdown_started.is_set():
400                     continue
401
402                 # Process this entry
403                 if qentry.inode_op(self, locked_ops):
404                     self._inode_remove_queue.task_done()
405
406                 # Give up the reference
407                 qentry = None
408
409             with llfuse.lock:
410                 while locked_ops:
411                     if locked_ops.popleft().inode_op(self, None):
412                         self._inode_remove_queue.task_done()
413                 self.cap_cache_event.clear()
414                 for entry in self.inode_cache.evict_candidates():
415                     self._remove(entry)
416
417     def wait_remove_queue_empty(self):
418         # used by tests
419         self._inode_remove_queue.join()
420
421     def _remove(self, entry):
422         """Remove an inode entry if possible.
423
424         If the entry is still referenced or in use, don't do anything.
425         If this is not referenced but the parent is still referenced,
426         clear any data held by the object (which may include directory
427         entries under the object) but don't remove it from the inode
428         table.
429
430         """
431         try:
432             if entry.inode is None:
433                 # Removed already
434                 return
435
436             if entry.inode == llfuse.ROOT_INODE:
437                 return
438
439             if entry.in_use():
440                 # referenced internally, stay pinned
441                 #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
442                 return
443
444             # Tell the kernel it should forget about it
445             entry.kernel_invalidate()
446
447             if entry.has_ref():
448                 # has kernel reference, could still be accessed.
449                 # when the kernel forgets about it, we can delete it.
450                 #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode)
451                 return
452
453             # commit any pending changes
454             with llfuse.lock_released:
455                 entry.finalize()
456
457             # Clear the contents
458             entry.clear()
459
460             if entry.parent_inode is None:
461                 _logger.debug("InodeCache forgetting inode %i, object cache_size %i, cache total %i, forget_inode True, inode entries %i, type %s",
462                               entry.inode, entry.cache_size, self.inode_cache.total(),
463                               len(self._entries), type(entry))
464
465                 if entry.cache_uuid:
466                     self._by_uuid[entry.cache_uuid].remove(entry)
467                     if not self._by_uuid[entry.cache_uuid]:
468                         del self._by_uuid[entry.cache_uuid]
469                     entry.cache_uuid = None
470
471                 self.inode_cache.unmanage(entry)
472
473                 del self._entries[entry.inode]
474                 entry.inode = None
475
476         except Exception as e:
477             _logger.exception("failed remove")
478
479     def invalidate_inode(self, entry):
480         if entry.has_ref():
481             # Only necessary if the kernel has previously done a lookup on this
482             # inode and hasn't yet forgotten about it.
483             self._inode_remove_queue.put(InvalidateInode(entry.inode))
484
485     def invalidate_entry(self, entry, name):
486         if entry.has_ref():
487             # Only necessary if the kernel has previously done a lookup on this
488             # inode and hasn't yet forgotten about it.
489             self._inode_remove_queue.put(InvalidateEntry(entry.inode, name.encode(self.encoding)))
490
491     def begin_shutdown(self):
492         self._inode_remove_queue.put(None)
493         if self._inode_remove_thread is not None:
494             self._inode_remove_thread.join()
495         self._inode_remove_thread = None
496
497     def clear(self):
498         with llfuse.lock_released:
499             self.begin_shutdown()
500
501         self.inode_cache.clear()
502         self._by_uuid.clear()
503
504         for k,v in self._entries.items():
505             try:
506                 v.finalize()
507             except Exception as e:
508                 _logger.exception("Error during finalize of inode %i", k)
509
510         self._entries.clear()
511
512     def forward_slash_subst(self):
513         return self._fsns
514
515     def find_by_uuid(self, uuid):
516         """Return a list of zero or more inode entries corresponding
517         to this Arvados UUID."""
518         return self._by_uuid.get(uuid, [])
519
520
521 def catch_exceptions(orig_func):
522     """Catch uncaught exceptions and log them consistently."""
523
524     @functools.wraps(orig_func)
525     def catch_exceptions_wrapper(self, *args, **kwargs):
526         try:
527             return orig_func(self, *args, **kwargs)
528         except llfuse.FUSEError:
529             raise
530         except EnvironmentError as e:
531             raise llfuse.FUSEError(e.errno)
532         except NotImplementedError:
533             raise llfuse.FUSEError(errno.ENOTSUP)
534         except arvados.errors.KeepWriteError as e:
535             _logger.error("Keep write error: " + str(e))
536             raise llfuse.FUSEError(errno.EIO)
537         except arvados.errors.NotFoundError as e:
538             _logger.error("Block not found error: " + str(e))
539             raise llfuse.FUSEError(errno.EIO)
540         except:
541             _logger.exception("Unhandled exception during FUSE operation")
542             raise llfuse.FUSEError(errno.EIO)
543
544     return catch_exceptions_wrapper
545
546
547 class Operations(llfuse.Operations):
548     """This is the main interface with llfuse.
549
550     The methods on this object are called by llfuse threads to service FUSE
551     events to query and read from the file system.
552
553     llfuse has its own global lock which is acquired before calling a request handler,
554     so request handlers do not run concurrently unless the lock is explicitly released
555     using 'with llfuse.lock_released:'
556
557     """
558
559     fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
560     read_time = fuse_time.labels(op='read')
561     write_time = fuse_time.labels(op='write')
562     destroy_time = fuse_time.labels(op='destroy')
563     on_event_time = fuse_time.labels(op='on_event')
564     getattr_time = fuse_time.labels(op='getattr')
565     setattr_time = fuse_time.labels(op='setattr')
566     lookup_time = fuse_time.labels(op='lookup')
567     forget_time = fuse_time.labels(op='forget')
568     open_time = fuse_time.labels(op='open')
569     release_time = fuse_time.labels(op='release')
570     opendir_time = fuse_time.labels(op='opendir')
571     readdir_time = fuse_time.labels(op='readdir')
572     statfs_time = fuse_time.labels(op='statfs')
573     create_time = fuse_time.labels(op='create')
574     mkdir_time = fuse_time.labels(op='mkdir')
575     unlink_time = fuse_time.labels(op='unlink')
576     rmdir_time = fuse_time.labels(op='rmdir')
577     rename_time = fuse_time.labels(op='rename')
578     flush_time = fuse_time.labels(op='flush')
579
580     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False, fsns=None):
581         super(Operations, self).__init__()
582
583         self._api_client = api_client
584
585         if not inode_cache:
586             inode_cache = InodeCache(cap=256*1024*1024)
587
588         if fsns is None:
589             try:
590                 fsns = self._api_client.config()["Collections"]["ForwardSlashNameSubstitution"]
591             except KeyError:
592                 # old API server with no FSNS config
593                 fsns = '_'
594             else:
595                 if fsns == '' or fsns == '/':
596                     fsns = None
597
598         # If we get overlapping shutdown events (e.g., fusermount -u
599         # -z and operations.destroy()) llfuse calls forget() on inodes
600         # that have already been deleted. To avoid this, we make
601         # forget() a no-op if called after destroy().
602         self._shutdown_started = threading.Event()
603
604         self.inodes = Inodes(inode_cache, encoding=encoding, fsns=fsns,
605                              shutdown_started=self._shutdown_started)
606         self.uid = uid
607         self.gid = gid
608         self.enable_write = enable_write
609
610         # dict of inode to filehandle
611         self._filehandles = {}
612         self._filehandles_counter = itertools.count(0)
613
614         # Other threads that need to wait until the fuse driver
615         # is fully initialized should wait() on this event object.
616         self.initlock = threading.Event()
617
618         self.num_retries = num_retries
619
620         self.read_counter = arvados.keep.Counter()
621         self.write_counter = arvados.keep.Counter()
622         self.read_ops_counter = arvados.keep.Counter()
623         self.write_ops_counter = arvados.keep.Counter()
624
625         self.events = None
626
627     def init(self):
628         # Allow threads that are waiting for the driver to be finished
629         # initializing to continue
630         self.initlock.set()
631
632     def metric_samples(self):
633         return self.fuse_time.collect()[0].samples
634
635     def metric_op_names(self):
636         ops = []
637         for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
638             if cur_op not in ops:
639                 ops.append(cur_op)
640         return ops
641
642     def metric_value(self, opname, metric):
643         op_value = [sample.value for sample in self.metric_samples()
644                     if sample.name == metric and sample.labels['op'] == opname]
645         return op_value[0] if len(op_value) == 1 else None
646
647     def metric_sum_func(self, opname):
648         return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
649
650     def metric_count_func(self, opname):
651         return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
652
653     def begin_shutdown(self):
654         self._shutdown_started.set()
655         self.inodes.begin_shutdown()
656
657     @destroy_time.time()
658     @catch_exceptions
659     def destroy(self):
660         _logger.debug("arv-mount destroy: start")
661
662         with llfuse.lock_released:
663             self.begin_shutdown()
664
665         if self.events:
666             self.events.close()
667             self.events = None
668
669         self.inodes.clear()
670
671         _logger.debug("arv-mount destroy: complete")
672
673
674     def access(self, inode, mode, ctx):
675         return True
676
677     def listen_for_events(self):
678         self.events = arvados.events.subscribe(
679             self._api_client,
680             [["event_type", "in", ["create", "update", "delete"]]],
681             self.on_event)
682
683     @on_event_time.time()
684     @catch_exceptions
685     def on_event(self, ev):
686         if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
687             return
688         with llfuse.lock:
689             properties = ev.get("properties") or {}
690             old_attrs = properties.get("old_attributes") or {}
691             new_attrs = properties.get("new_attributes") or {}
692
693             for item in self.inodes.find_by_uuid(ev["object_uuid"]):
694                 item.invalidate()
695
696             oldowner = old_attrs.get("owner_uuid")
697             newowner = ev.get("object_owner_uuid")
698             for parent in (
699                     self.inodes.find_by_uuid(oldowner) +
700                     self.inodes.find_by_uuid(newowner)):
701                 parent.invalidate()
702
703     @getattr_time.time()
704     @catch_exceptions
705     def getattr(self, inode, ctx=None):
706         if inode not in self.inodes:
707             _logger.debug("arv-mount getattr: inode %i missing", inode)
708             raise llfuse.FUSEError(errno.ENOENT)
709
710         e = self.inodes[inode]
711         self.inodes.touch(e)
712         parent = None
713         if e.parent_inode:
714             parent = self.inodes[e.parent_inode]
715             self.inodes.touch(parent)
716
717         entry = llfuse.EntryAttributes()
718         entry.st_ino = inode
719         entry.generation = 0
720         entry.entry_timeout = parent.time_to_next_poll() if parent is not None else 0
721         entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
722
723         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
724         if isinstance(e, Directory):
725             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
726         else:
727             entry.st_mode |= stat.S_IFREG
728             if isinstance(e, FuseArvadosFile):
729                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
730
731         if self.enable_write and e.writable():
732             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
733
734         entry.st_nlink = 1
735         entry.st_uid = self.uid
736         entry.st_gid = self.gid
737         entry.st_rdev = 0
738
739         entry.st_size = e.size()
740
741         entry.st_blksize = 512
742         entry.st_blocks = (entry.st_size // 512) + 1
743         if hasattr(entry, 'st_atime_ns'):
744             # llfuse >= 0.42
745             entry.st_atime_ns = int(e.atime() * 1000000000)
746             entry.st_mtime_ns = int(e.mtime() * 1000000000)
747             entry.st_ctime_ns = int(e.mtime() * 1000000000)
748         else:
749             # llfuse < 0.42
750             entry.st_atime = int(e.atime)
751             entry.st_mtime = int(e.mtime)
752             entry.st_ctime = int(e.mtime)
753
754         return entry
755
756     @setattr_time.time()
757     @catch_exceptions
758     def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
759         entry = self.getattr(inode)
760
761         if fh is not None and fh in self._filehandles:
762             handle = self._filehandles[fh]
763             e = handle.obj
764         else:
765             e = self.inodes[inode]
766
767         if fields is None:
768             # llfuse < 0.42
769             update_size = attr.st_size is not None
770         else:
771             # llfuse >= 0.42
772             update_size = fields.update_size
773         if update_size and isinstance(e, FuseArvadosFile):
774             with llfuse.lock_released:
775                 e.arvfile.truncate(attr.st_size)
776                 entry.st_size = e.arvfile.size()
777
778         return entry
779
780     @lookup_time.time()
781     @catch_exceptions
782     def lookup(self, parent_inode, name, ctx=None):
783         name = str(name, self.inodes.encoding)
784         inode = None
785
786         if name == '.':
787             inode = parent_inode
788         elif parent_inode in self.inodes:
789             p = self.inodes[parent_inode]
790             self.inodes.touch(p)
791             if name == '..':
792                 inode = p.parent_inode
793             elif isinstance(p, Directory) and name in p:
794                 if p[name].inode is None:
795                     _logger.debug("arv-mount lookup: parent_inode %i name '%s' found but inode was None",
796                                   parent_inode, name)
797                     raise llfuse.FUSEError(errno.ENOENT)
798
799                 inode = p[name].inode
800
801         if inode != None:
802             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
803                       parent_inode, name, inode)
804             self.inodes.touch(self.inodes[inode])
805             self.inodes[inode].inc_ref()
806             return self.getattr(inode)
807         else:
808             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
809                       parent_inode, name)
810             raise llfuse.FUSEError(errno.ENOENT)
811
812     @forget_time.time()
813     @catch_exceptions
814     def forget(self, inodes):
815         if self._shutdown_started.is_set():
816             return
817         for inode, nlookup in inodes:
818             ent = self.inodes[inode]
819             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
820             if ent.dec_ref(nlookup) == 0 and ent.parent_inode is None:
821                 self.inodes.del_entry(ent)
822
823     @open_time.time()
824     @catch_exceptions
825     def open(self, inode, flags, ctx=None):
826         if inode in self.inodes:
827             p = self.inodes[inode]
828         else:
829             _logger.debug("arv-mount open: inode %i missing", inode)
830             raise llfuse.FUSEError(errno.ENOENT)
831
832         if isinstance(p, Directory):
833             raise llfuse.FUSEError(errno.EISDIR)
834
835         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
836             raise llfuse.FUSEError(errno.EPERM)
837
838         fh = next(self._filehandles_counter)
839         self._filehandles[fh] = FileHandle(fh, p)
840         self.inodes.touch(p)
841
842         # Normally, we will have received an "update" event if the
843         # parent collection is stale here. However, even if the parent
844         # collection hasn't changed, the manifest might have been
845         # fetched so long ago that the signatures on the data block
846         # locators have expired. Calling checkupdate() on all
847         # ancestors ensures the signatures will be refreshed if
848         # necessary.
849         while p.parent_inode in self.inodes:
850             if p == self.inodes[p.parent_inode]:
851                 break
852             p = self.inodes[p.parent_inode]
853             self.inodes.touch(p)
854             p.checkupdate()
855
856         _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
857
858         return fh
859
860     @read_time.time()
861     @catch_exceptions
862     def read(self, fh, off, size):
863         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
864         self.read_ops_counter.add(1)
865
866         if fh in self._filehandles:
867             handle = self._filehandles[fh]
868         else:
869             raise llfuse.FUSEError(errno.EBADF)
870
871         self.inodes.touch(handle.obj)
872
873         r = handle.obj.readfrom(off, size, self.num_retries)
874         if r:
875             self.read_counter.add(len(r))
876         return r
877
878     @write_time.time()
879     @catch_exceptions
880     def write(self, fh, off, buf):
881         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
882         self.write_ops_counter.add(1)
883
884         if fh in self._filehandles:
885             handle = self._filehandles[fh]
886         else:
887             raise llfuse.FUSEError(errno.EBADF)
888
889         if not handle.obj.writable():
890             raise llfuse.FUSEError(errno.EPERM)
891
892         self.inodes.touch(handle.obj)
893
894         w = handle.obj.writeto(off, buf, self.num_retries)
895         if w:
896             self.write_counter.add(w)
897         return w
898
899     @release_time.time()
900     @catch_exceptions
901     def release(self, fh):
902         if fh in self._filehandles:
903             _logger.debug("arv-mount release fh %i", fh)
904             try:
905                 self._filehandles[fh].flush()
906             except Exception:
907                 raise
908             finally:
909                 self._filehandles[fh].release()
910                 del self._filehandles[fh]
911         self.inodes.cap_cache()
912
913     def releasedir(self, fh):
914         self.release(fh)
915
916     @opendir_time.time()
917     @catch_exceptions
918     def opendir(self, inode, ctx=None):
919         _logger.debug("arv-mount opendir: inode %i", inode)
920
921         if inode in self.inodes:
922             p = self.inodes[inode]
923         else:
924             _logger.debug("arv-mount opendir: called with unknown or removed inode %i", inode)
925             raise llfuse.FUSEError(errno.ENOENT)
926
927         if not isinstance(p, Directory):
928             raise llfuse.FUSEError(errno.ENOTDIR)
929
930         fh = next(self._filehandles_counter)
931         if p.parent_inode in self.inodes:
932             parent = self.inodes[p.parent_inode]
933         else:
934             _logger.warning("arv-mount opendir: parent inode %i of %i is missing", p.parent_inode, inode)
935             raise llfuse.FUSEError(errno.EIO)
936
937         _logger.debug("arv-mount opendir: inode %i fh %i ", inode, fh)
938
939         # update atime
940         p.inc_use()
941         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + p.items())
942         p.dec_use()
943         self.inodes.touch(p)
944         return fh
945
946     @readdir_time.time()
947     @catch_exceptions
948     def readdir(self, fh, off):
949         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
950
951         if fh in self._filehandles:
952             handle = self._filehandles[fh]
953         else:
954             raise llfuse.FUSEError(errno.EBADF)
955
956         e = off
957         while e < len(handle.entries):
958             ent = handle.entries[e]
959             if ent[1].inode in self.inodes:
960                 yield (ent[0].encode(self.inodes.encoding), self.getattr(ent[1].inode), e+1)
961             e += 1
962
963     @statfs_time.time()
964     @catch_exceptions
965     def statfs(self, ctx=None):
966         st = llfuse.StatvfsData()
967         st.f_bsize = 128 * 1024
968         st.f_blocks = 0
969         st.f_files = 0
970
971         st.f_bfree = 0
972         st.f_bavail = 0
973
974         st.f_ffree = 0
975         st.f_favail = 0
976
977         st.f_frsize = 0
978         return st
979
980     def _check_writable(self, inode_parent):
981         if not self.enable_write:
982             raise llfuse.FUSEError(errno.EROFS)
983
984         if inode_parent in self.inodes:
985             p = self.inodes[inode_parent]
986         else:
987             raise llfuse.FUSEError(errno.ENOENT)
988
989         if not isinstance(p, Directory):
990             raise llfuse.FUSEError(errno.ENOTDIR)
991
992         if not p.writable():
993             raise llfuse.FUSEError(errno.EPERM)
994
995         return p
996
997     @create_time.time()
998     @catch_exceptions
999     def create(self, inode_parent, name, mode, flags, ctx=None):
1000         name = name.decode(encoding=self.inodes.encoding)
1001         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
1002
1003         p = self._check_writable(inode_parent)
1004         p.create(name)
1005
1006         # The file entry should have been implicitly created by callback.
1007         f = p[name]
1008         fh = next(self._filehandles_counter)
1009         self._filehandles[fh] = FileHandle(fh, f)
1010         self.inodes.touch(p)
1011
1012         f.inc_ref()
1013         return (fh, self.getattr(f.inode))
1014
1015     @mkdir_time.time()
1016     @catch_exceptions
1017     def mkdir(self, inode_parent, name, mode, ctx=None):
1018         name = name.decode(encoding=self.inodes.encoding)
1019         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
1020
1021         p = self._check_writable(inode_parent)
1022         p.mkdir(name)
1023
1024         # The dir entry should have been implicitly created by callback.
1025         d = p[name]
1026
1027         d.inc_ref()
1028         return self.getattr(d.inode)
1029
1030     @unlink_time.time()
1031     @catch_exceptions
1032     def unlink(self, inode_parent, name, ctx=None):
1033         name = name.decode(encoding=self.inodes.encoding)
1034         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
1035         p = self._check_writable(inode_parent)
1036         p.unlink(name)
1037
1038     @rmdir_time.time()
1039     @catch_exceptions
1040     def rmdir(self, inode_parent, name, ctx=None):
1041         name = name.decode(encoding=self.inodes.encoding)
1042         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
1043         p = self._check_writable(inode_parent)
1044         p.rmdir(name)
1045
1046     @rename_time.time()
1047     @catch_exceptions
1048     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
1049         name_old = name_old.decode(encoding=self.inodes.encoding)
1050         name_new = name_new.decode(encoding=self.inodes.encoding)
1051         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
1052         src = self._check_writable(inode_parent_old)
1053         dest = self._check_writable(inode_parent_new)
1054         dest.rename(name_old, name_new, src)
1055
1056     @flush_time.time()
1057     @catch_exceptions
1058     def flush(self, fh):
1059         if fh in self._filehandles:
1060             self._filehandles[fh].flush()
1061
1062     def fsync(self, fh, datasync):
1063         self.flush(fh)
1064
1065     def fsyncdir(self, fh, datasync):
1066         self.flush(fh)