Merge branch '21364-load-more-button'
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 """FUSE driver for Arvados Keep
6
7 Architecture:
8
9 There is one `Operations` object per mount point.  It is the entry point for all
10 read and write requests from the llfuse module.
11
12 The operations object owns an `Inodes` object.  The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
15
16 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
17 memory footprint of file system objects and when they are last used.  When the
18 cache limit is exceeded, the least recently used objects are cleared.
19
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
21
22 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
24
25 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`.  These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
30 returing a result.
31
32 The general FUSE operation flow is as follows:
33
34 - The request handler is called with either an inode or file handle that is the
35   subject of the operation.
36
37 - Look up the inode using the Inodes table or the file handle in the
38   filehandles table to get the file system object.
39
40 - For methods that alter files or directories, check that the operation is
41   valid and permitted using _check_writable().
42
43 - Call the relevant method on the file system object.
44
45 - Return the result.
46
47 The FUSE driver supports the Arvados event bus.  When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
49
50 Implementation note: in the code, the terms 'object', 'entry' and
51 'inode' are used somewhat interchangeably, but generally mean an
52 arvados_fuse.File or arvados_fuse.Directory object which has numeric
53 inode assigned to it and appears in the Inodes._entries dictionary.
54
55 """
56
57 import os
58 import llfuse
59 import errno
60 import stat
61 import threading
62 import arvados
63 import arvados.events
64 import logging
65 import time
66 import threading
67 import itertools
68 import collections
69 import functools
70 import arvados.keep
71 from prometheus_client import Summary
72 import queue
73 from dataclasses import dataclass
74 import typing
75
76 from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
77 from .fusefile import File, StringFile, FuseArvadosFile
78
79 _logger = logging.getLogger('arvados.arvados_fuse')
80
81 # Uncomment this to enable llfuse debug logging.
82 # log_handler = logging.StreamHandler()
83 # llogger = logging.getLogger('llfuse')
84 # llogger.addHandler(log_handler)
85 # llogger.setLevel(logging.DEBUG)
86
87 class Handle(object):
88     """Connects a numeric file handle to a File or Directory object that has
89     been opened by the client."""
90
91     def __init__(self, fh, obj):
92         self.fh = fh
93         self.obj = obj
94         self.obj.inc_use()
95
96     def release(self):
97         self.obj.dec_use()
98
99     def flush(self):
100         pass
101
102
103 class FileHandle(Handle):
104     """Connects a numeric file handle to a File  object that has
105     been opened by the client."""
106
107     def flush(self):
108         if self.obj.writable():
109             return self.obj.flush()
110
111
112 class DirectoryHandle(Handle):
113     """Connects a numeric file handle to a Directory object that has
114     been opened by the client.
115
116     DirectoryHandle is used by opendir() and readdir() to get
117     directory listings.  Entries returned by readdir() don't increment
118     the lookup count (kernel references), so increment our internal
119     "use count" to avoid having an item being removed mid-read.
120
121     """
122
123     def __init__(self, fh, dirobj, entries):
124         super(DirectoryHandle, self).__init__(fh, dirobj)
125         self.entries = entries
126
127         for ent in self.entries:
128             ent[1].inc_use()
129
130     def release(self):
131         for ent in self.entries:
132             ent[1].dec_use()
133         super(DirectoryHandle, self).release()
134
135
136 class InodeCache(object):
137     """Records the memory footprint of objects and when they are last used.
138
139     When the cache limit is exceeded, the least recently used objects
140     are cleared.  Clearing the object means discarding its contents to
141     release memory.  The next time the object is accessed, it must be
142     re-fetched from the server.  Note that the inode cache limit is a
143     soft limit; the cache limit may be exceeded if necessary to load
144     very large projects or collections, it may also be exceeded if an
145     inode can't be safely discarded based on kernel lookups
146     (has_ref()) or internal use count (in_use()).
147
148     """
149
150     def __init__(self, cap, min_entries=4):
151         # Standard dictionaries are ordered, but OrderedDict is still better here, see
152         # https://docs.python.org/3.11/library/collections.html#ordereddict-objects
153         # specifically we use move_to_end() which standard dicts don't have.
154         self._cache_entries = collections.OrderedDict()
155         self.cap = cap
156         self._total = 0
157         self.min_entries = min_entries
158
159     def total(self):
160         return self._total
161
162     def evict_candidates(self):
163         """Yield entries that are candidates to be evicted
164         and stop when the cache total has shrunk sufficiently.
165
166         Implements a LRU cache, when an item is added or touch()ed it
167         goes to the back of the OrderedDict, so items in the front are
168         oldest.  The Inodes._remove() function determines if the entry
169         can actually be removed safely.
170
171         """
172
173         if self._total <= self.cap:
174             return
175
176         _logger.debug("InodeCache evict_candidates total %i cap %i entries %i", self._total, self.cap, len(self._cache_entries))
177
178         # Copy this into a deque for two reasons:
179         #
180         # 1. _cache_entries is modified by unmanage() which is called
181         # by _remove
182         #
183         # 2. popping off the front means the reference goes away
184         # immediately intead of sticking around for the lifetime of
185         # "values"
186         values = collections.deque(self._cache_entries.values())
187
188         while values:
189             if self._total < self.cap or len(self._cache_entries) < self.min_entries:
190                 break
191             yield values.popleft()
192
193     def unmanage(self, entry):
194         """Stop managing an object in the cache.
195
196         This happens when an object is being removed from the inode
197         entries table.
198
199         """
200
201         if entry.inode not in self._cache_entries:
202             return
203
204         # manage cache size running sum
205         self._total -= entry.cache_size
206         entry.cache_size = 0
207
208         # Now forget about it
209         del self._cache_entries[entry.inode]
210
211     def update_cache_size(self, obj):
212         """Update the cache total in response to the footprint of an
213         object changing (usually because it has been loaded or
214         cleared).
215
216         Adds or removes entries to the cache list based on the object
217         cache size.
218
219         """
220
221         if not obj.persisted():
222             return
223
224         if obj.inode in self._cache_entries:
225             self._total -= obj.cache_size
226
227         obj.cache_size = obj.objsize()
228
229         if obj.cache_size > 0 or obj.parent_inode is None:
230             self._total += obj.cache_size
231             self._cache_entries[obj.inode] = obj
232         elif obj.cache_size == 0 and obj.inode in self._cache_entries:
233             del self._cache_entries[obj.inode]
234
235     def touch(self, obj):
236         """Indicate an object was used recently, making it low
237         priority to be removed from the cache.
238
239         """
240         if obj.inode in self._cache_entries:
241             self._cache_entries.move_to_end(obj.inode)
242             return True
243         return False
244
245     def clear(self):
246         self._cache_entries.clear()
247         self._total = 0
248
249 @dataclass
250 class RemoveInode:
251     entry: typing.Union[Directory, File]
252     def inode_op(self, inodes, locked_ops):
253         if locked_ops is None:
254             inodes._remove(self.entry)
255             return True
256         else:
257             locked_ops.append(self)
258             return False
259
260 @dataclass
261 class InvalidateInode:
262     inode: int
263     def inode_op(self, inodes, locked_ops):
264         llfuse.invalidate_inode(self.inode)
265         return True
266
267 @dataclass
268 class InvalidateEntry:
269     inode: int
270     name: str
271     def inode_op(self, inodes, locked_ops):
272         llfuse.invalidate_entry(self.inode, self.name)
273         return True
274
275 @dataclass
276 class EvictCandidates:
277     def inode_op(self, inodes, locked_ops):
278         return True
279
280
281 class Inodes(object):
282     """Manage the set of inodes.
283
284     This is the mapping from a numeric id to a concrete File or
285     Directory object
286
287     """
288
289     def __init__(self, inode_cache, encoding="utf-8", fsns=None, shutdown_started=None):
290         self._entries = {}
291         self._counter = itertools.count(llfuse.ROOT_INODE)
292         self.inode_cache = inode_cache
293         self.encoding = encoding
294         self._fsns = fsns
295         self._shutdown_started = shutdown_started or threading.Event()
296
297         self._inode_remove_queue = queue.Queue()
298         self._inode_remove_thread = threading.Thread(None, self._inode_remove)
299         self._inode_remove_thread.daemon = True
300         self._inode_remove_thread.start()
301
302         self._by_uuid = collections.defaultdict(list)
303
304     def __getitem__(self, item):
305         return self._entries[item]
306
307     def __setitem__(self, key, item):
308         self._entries[key] = item
309
310     def __iter__(self):
311         return iter(self._entries.keys())
312
313     def items(self):
314         return self._entries.items()
315
316     def __contains__(self, k):
317         return k in self._entries
318
319     def touch(self, entry):
320         """Update the access time, adjust the cache position, and
321         notify the _inode_remove thread to recheck the cache.
322
323         """
324
325         entry._atime = time.time()
326         if self.inode_cache.touch(entry):
327             self.cap_cache()
328
329     def cap_cache(self):
330         """Notify the _inode_remove thread to recheck the cache."""
331         if self._inode_remove_queue.empty():
332             self._inode_remove_queue.put(EvictCandidates())
333
334     def update_uuid(self, entry):
335         """Update the Arvados uuid associated with an inode entry.
336
337         This is used to look up inodes that need to be invalidated
338         when a websocket event indicates the object has changed on the
339         API server.
340
341         """
342         if entry.cache_uuid and entry in self._by_uuid[entry.cache_uuid]:
343             self._by_uuid[entry.cache_uuid].remove(entry)
344
345         entry.cache_uuid = entry.uuid()
346         if entry.cache_uuid and entry not in self._by_uuid[entry.cache_uuid]:
347             self._by_uuid[entry.cache_uuid].append(entry)
348
349         if not self._by_uuid[entry.cache_uuid]:
350             del self._by_uuid[entry.cache_uuid]
351
352     def add_entry(self, entry):
353         """Assign a numeric inode to a new entry."""
354
355         entry.inode = next(self._counter)
356         if entry.inode == llfuse.ROOT_INODE:
357             entry.inc_ref()
358         self._entries[entry.inode] = entry
359
360         self.update_uuid(entry)
361         self.inode_cache.update_cache_size(entry)
362         self.cap_cache()
363         return entry
364
365     def del_entry(self, entry):
366         """Remove entry from the inode table.
367
368         Indicate this inode entry is pending deletion by setting
369         parent_inode to None.  Notify the _inode_remove thread to try
370         and remove it.
371
372         """
373
374         entry.parent_inode = None
375         self._inode_remove_queue.put(RemoveInode(entry))
376         _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
377
378     def _inode_remove(self):
379         """Background thread to handle tasks related to invalidating
380         inodes in the kernel, and removing objects from the inodes
381         table entirely.
382
383         """
384
385         locked_ops = collections.deque()
386         shutting_down = False
387         while not shutting_down:
388             tasks_done = 0
389             blocking_get = True
390             while True:
391                 try:
392                     qentry = self._inode_remove_queue.get(blocking_get)
393                 except queue.Empty:
394                     break
395
396                 blocking_get = False
397                 if qentry is None:
398                     shutting_down = True
399                     continue
400
401                 # Process (or defer) this entry
402                 qentry.inode_op(self, locked_ops)
403                 tasks_done += 1
404
405                 # Give up the reference
406                 qentry = None
407
408             with llfuse.lock:
409                 while locked_ops:
410                     locked_ops.popleft().inode_op(self, None)
411                 for entry in self.inode_cache.evict_candidates():
412                     self._remove(entry)
413
414             # Unblock _inode_remove_queue.join() only when all of the
415             # deferred work is done, i.e., after calling inode_op()
416             # and then evict_candidates().
417             for _ in range(tasks_done):
418                 self._inode_remove_queue.task_done()
419
420     def wait_remove_queue_empty(self):
421         # used by tests
422         self._inode_remove_queue.join()
423
424     def _remove(self, entry):
425         """Remove an inode entry if possible.
426
427         If the entry is still referenced or in use, don't do anything.
428         If this is not referenced but the parent is still referenced,
429         clear any data held by the object (which may include directory
430         entries under the object) but don't remove it from the inode
431         table.
432
433         """
434         try:
435             if entry.inode is None:
436                 # Removed already
437                 return
438
439             if entry.inode == llfuse.ROOT_INODE:
440                 return
441
442             if entry.in_use():
443                 # referenced internally, stay pinned
444                 #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
445                 return
446
447             # Tell the kernel it should forget about it
448             entry.kernel_invalidate()
449
450             if entry.has_ref():
451                 # has kernel reference, could still be accessed.
452                 # when the kernel forgets about it, we can delete it.
453                 #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode)
454                 return
455
456             # commit any pending changes
457             with llfuse.lock_released:
458                 entry.finalize()
459
460             # Clear the contents
461             entry.clear()
462
463             if entry.parent_inode is None:
464                 _logger.debug("InodeCache forgetting inode %i, object cache_size %i, cache total %i, forget_inode True, inode entries %i, type %s",
465                               entry.inode, entry.cache_size, self.inode_cache.total(),
466                               len(self._entries), type(entry))
467
468                 if entry.cache_uuid:
469                     self._by_uuid[entry.cache_uuid].remove(entry)
470                     if not self._by_uuid[entry.cache_uuid]:
471                         del self._by_uuid[entry.cache_uuid]
472                     entry.cache_uuid = None
473
474                 self.inode_cache.unmanage(entry)
475
476                 del self._entries[entry.inode]
477                 entry.inode = None
478
479         except Exception as e:
480             _logger.exception("failed remove")
481
482     def invalidate_inode(self, entry):
483         if entry.has_ref():
484             # Only necessary if the kernel has previously done a lookup on this
485             # inode and hasn't yet forgotten about it.
486             self._inode_remove_queue.put(InvalidateInode(entry.inode))
487
488     def invalidate_entry(self, entry, name):
489         if entry.has_ref():
490             # Only necessary if the kernel has previously done a lookup on this
491             # inode and hasn't yet forgotten about it.
492             self._inode_remove_queue.put(InvalidateEntry(entry.inode, name.encode(self.encoding)))
493
494     def begin_shutdown(self):
495         self._inode_remove_queue.put(None)
496         if self._inode_remove_thread is not None:
497             self._inode_remove_thread.join()
498         self._inode_remove_thread = None
499
500     def clear(self):
501         with llfuse.lock_released:
502             self.begin_shutdown()
503
504         self.inode_cache.clear()
505         self._by_uuid.clear()
506
507         for k,v in self._entries.items():
508             try:
509                 v.finalize()
510             except Exception as e:
511                 _logger.exception("Error during finalize of inode %i", k)
512
513         self._entries.clear()
514
515     def forward_slash_subst(self):
516         return self._fsns
517
518     def find_by_uuid(self, uuid):
519         """Return a list of zero or more inode entries corresponding
520         to this Arvados UUID."""
521         return self._by_uuid.get(uuid, [])
522
523
524 def catch_exceptions(orig_func):
525     """Catch uncaught exceptions and log them consistently."""
526
527     @functools.wraps(orig_func)
528     def catch_exceptions_wrapper(self, *args, **kwargs):
529         try:
530             return orig_func(self, *args, **kwargs)
531         except llfuse.FUSEError:
532             raise
533         except EnvironmentError as e:
534             raise llfuse.FUSEError(e.errno)
535         except NotImplementedError:
536             raise llfuse.FUSEError(errno.ENOTSUP)
537         except arvados.errors.KeepWriteError as e:
538             _logger.error("Keep write error: " + str(e))
539             raise llfuse.FUSEError(errno.EIO)
540         except arvados.errors.NotFoundError as e:
541             _logger.error("Block not found error: " + str(e))
542             raise llfuse.FUSEError(errno.EIO)
543         except:
544             _logger.exception("Unhandled exception during FUSE operation")
545             raise llfuse.FUSEError(errno.EIO)
546
547     return catch_exceptions_wrapper
548
549
550 class Operations(llfuse.Operations):
551     """This is the main interface with llfuse.
552
553     The methods on this object are called by llfuse threads to service FUSE
554     events to query and read from the file system.
555
556     llfuse has its own global lock which is acquired before calling a request handler,
557     so request handlers do not run concurrently unless the lock is explicitly released
558     using 'with llfuse.lock_released:'
559
560     """
561
562     fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
563     read_time = fuse_time.labels(op='read')
564     write_time = fuse_time.labels(op='write')
565     destroy_time = fuse_time.labels(op='destroy')
566     on_event_time = fuse_time.labels(op='on_event')
567     getattr_time = fuse_time.labels(op='getattr')
568     setattr_time = fuse_time.labels(op='setattr')
569     lookup_time = fuse_time.labels(op='lookup')
570     forget_time = fuse_time.labels(op='forget')
571     open_time = fuse_time.labels(op='open')
572     release_time = fuse_time.labels(op='release')
573     opendir_time = fuse_time.labels(op='opendir')
574     readdir_time = fuse_time.labels(op='readdir')
575     statfs_time = fuse_time.labels(op='statfs')
576     create_time = fuse_time.labels(op='create')
577     mkdir_time = fuse_time.labels(op='mkdir')
578     unlink_time = fuse_time.labels(op='unlink')
579     rmdir_time = fuse_time.labels(op='rmdir')
580     rename_time = fuse_time.labels(op='rename')
581     flush_time = fuse_time.labels(op='flush')
582
583     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False, fsns=None):
584         super(Operations, self).__init__()
585
586         self._api_client = api_client
587
588         if not inode_cache:
589             inode_cache = InodeCache(cap=256*1024*1024)
590
591         if fsns is None:
592             try:
593                 fsns = self._api_client.config()["Collections"]["ForwardSlashNameSubstitution"]
594             except KeyError:
595                 # old API server with no FSNS config
596                 fsns = '_'
597             else:
598                 if fsns == '' or fsns == '/':
599                     fsns = None
600
601         # If we get overlapping shutdown events (e.g., fusermount -u
602         # -z and operations.destroy()) llfuse calls forget() on inodes
603         # that have already been deleted. To avoid this, we make
604         # forget() a no-op if called after destroy().
605         self._shutdown_started = threading.Event()
606
607         self.inodes = Inodes(inode_cache, encoding=encoding, fsns=fsns,
608                              shutdown_started=self._shutdown_started)
609         self.uid = uid
610         self.gid = gid
611         self.enable_write = enable_write
612
613         # dict of inode to filehandle
614         self._filehandles = {}
615         self._filehandles_counter = itertools.count(0)
616
617         # Other threads that need to wait until the fuse driver
618         # is fully initialized should wait() on this event object.
619         self.initlock = threading.Event()
620
621         self.num_retries = num_retries
622
623         self.read_counter = arvados.keep.Counter()
624         self.write_counter = arvados.keep.Counter()
625         self.read_ops_counter = arvados.keep.Counter()
626         self.write_ops_counter = arvados.keep.Counter()
627
628         self.events = None
629
630     def init(self):
631         # Allow threads that are waiting for the driver to be finished
632         # initializing to continue
633         self.initlock.set()
634
635     def metric_samples(self):
636         return self.fuse_time.collect()[0].samples
637
638     def metric_op_names(self):
639         ops = []
640         for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
641             if cur_op not in ops:
642                 ops.append(cur_op)
643         return ops
644
645     def metric_value(self, opname, metric):
646         op_value = [sample.value for sample in self.metric_samples()
647                     if sample.name == metric and sample.labels['op'] == opname]
648         return op_value[0] if len(op_value) == 1 else None
649
650     def metric_sum_func(self, opname):
651         return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
652
653     def metric_count_func(self, opname):
654         return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
655
656     def begin_shutdown(self):
657         self._shutdown_started.set()
658         self.inodes.begin_shutdown()
659
660     @destroy_time.time()
661     @catch_exceptions
662     def destroy(self):
663         _logger.debug("arv-mount destroy: start")
664
665         with llfuse.lock_released:
666             self.begin_shutdown()
667
668         if self.events:
669             self.events.close()
670             self.events = None
671
672         self.inodes.clear()
673
674         _logger.debug("arv-mount destroy: complete")
675
676
677     def access(self, inode, mode, ctx):
678         return True
679
680     def listen_for_events(self):
681         self.events = arvados.events.subscribe(
682             self._api_client,
683             [["event_type", "in", ["create", "update", "delete"]]],
684             self.on_event)
685
686     @on_event_time.time()
687     @catch_exceptions
688     def on_event(self, ev):
689         if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
690             return
691         with llfuse.lock:
692             properties = ev.get("properties") or {}
693             old_attrs = properties.get("old_attributes") or {}
694             new_attrs = properties.get("new_attributes") or {}
695
696             for item in self.inodes.find_by_uuid(ev["object_uuid"]):
697                 item.invalidate()
698
699             oldowner = old_attrs.get("owner_uuid")
700             newowner = ev.get("object_owner_uuid")
701             for parent in (
702                     self.inodes.find_by_uuid(oldowner) +
703                     self.inodes.find_by_uuid(newowner)):
704                 parent.invalidate()
705
706     @getattr_time.time()
707     @catch_exceptions
708     def getattr(self, inode, ctx=None):
709         if inode not in self.inodes:
710             _logger.debug("arv-mount getattr: inode %i missing", inode)
711             raise llfuse.FUSEError(errno.ENOENT)
712
713         e = self.inodes[inode]
714         self.inodes.touch(e)
715         parent = None
716         if e.parent_inode:
717             parent = self.inodes[e.parent_inode]
718             self.inodes.touch(parent)
719
720         entry = llfuse.EntryAttributes()
721         entry.st_ino = inode
722         entry.generation = 0
723         entry.entry_timeout = parent.time_to_next_poll() if parent is not None else 0
724         entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
725
726         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
727         if isinstance(e, Directory):
728             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
729         else:
730             entry.st_mode |= stat.S_IFREG
731             if isinstance(e, FuseArvadosFile):
732                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
733
734         if self.enable_write and e.writable():
735             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
736
737         entry.st_nlink = 1
738         entry.st_uid = self.uid
739         entry.st_gid = self.gid
740         entry.st_rdev = 0
741
742         entry.st_size = e.size()
743
744         entry.st_blksize = 512
745         entry.st_blocks = (entry.st_size // 512) + 1
746         if hasattr(entry, 'st_atime_ns'):
747             # llfuse >= 0.42
748             entry.st_atime_ns = int(e.atime() * 1000000000)
749             entry.st_mtime_ns = int(e.mtime() * 1000000000)
750             entry.st_ctime_ns = int(e.mtime() * 1000000000)
751         else:
752             # llfuse < 0.42
753             entry.st_atime = int(e.atime)
754             entry.st_mtime = int(e.mtime)
755             entry.st_ctime = int(e.mtime)
756
757         return entry
758
759     @setattr_time.time()
760     @catch_exceptions
761     def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
762         entry = self.getattr(inode)
763
764         if fh is not None and fh in self._filehandles:
765             handle = self._filehandles[fh]
766             e = handle.obj
767         else:
768             e = self.inodes[inode]
769
770         if fields is None:
771             # llfuse < 0.42
772             update_size = attr.st_size is not None
773         else:
774             # llfuse >= 0.42
775             update_size = fields.update_size
776         if update_size and isinstance(e, FuseArvadosFile):
777             with llfuse.lock_released:
778                 e.arvfile.truncate(attr.st_size)
779                 entry.st_size = e.arvfile.size()
780
781         return entry
782
783     @lookup_time.time()
784     @catch_exceptions
785     def lookup(self, parent_inode, name, ctx=None):
786         name = str(name, self.inodes.encoding)
787         inode = None
788
789         if name == '.':
790             inode = parent_inode
791         elif parent_inode in self.inodes:
792             p = self.inodes[parent_inode]
793             self.inodes.touch(p)
794             if name == '..':
795                 inode = p.parent_inode
796             elif isinstance(p, Directory) and name in p:
797                 if p[name].inode is None:
798                     _logger.debug("arv-mount lookup: parent_inode %i name '%s' found but inode was None",
799                                   parent_inode, name)
800                     raise llfuse.FUSEError(errno.ENOENT)
801
802                 inode = p[name].inode
803
804         if inode != None:
805             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
806                       parent_inode, name, inode)
807             self.inodes.touch(self.inodes[inode])
808             self.inodes[inode].inc_ref()
809             return self.getattr(inode)
810         else:
811             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
812                       parent_inode, name)
813             raise llfuse.FUSEError(errno.ENOENT)
814
815     @forget_time.time()
816     @catch_exceptions
817     def forget(self, inodes):
818         if self._shutdown_started.is_set():
819             return
820         for inode, nlookup in inodes:
821             ent = self.inodes[inode]
822             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
823             if ent.dec_ref(nlookup) == 0 and ent.parent_inode is None:
824                 self.inodes.del_entry(ent)
825
826     @open_time.time()
827     @catch_exceptions
828     def open(self, inode, flags, ctx=None):
829         if inode in self.inodes:
830             p = self.inodes[inode]
831         else:
832             _logger.debug("arv-mount open: inode %i missing", inode)
833             raise llfuse.FUSEError(errno.ENOENT)
834
835         if isinstance(p, Directory):
836             raise llfuse.FUSEError(errno.EISDIR)
837
838         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
839             raise llfuse.FUSEError(errno.EPERM)
840
841         fh = next(self._filehandles_counter)
842         self._filehandles[fh] = FileHandle(fh, p)
843         self.inodes.touch(p)
844
845         # Normally, we will have received an "update" event if the
846         # parent collection is stale here. However, even if the parent
847         # collection hasn't changed, the manifest might have been
848         # fetched so long ago that the signatures on the data block
849         # locators have expired. Calling checkupdate() on all
850         # ancestors ensures the signatures will be refreshed if
851         # necessary.
852         while p.parent_inode in self.inodes:
853             if p == self.inodes[p.parent_inode]:
854                 break
855             p = self.inodes[p.parent_inode]
856             self.inodes.touch(p)
857             p.checkupdate()
858
859         _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
860
861         return fh
862
863     @read_time.time()
864     @catch_exceptions
865     def read(self, fh, off, size):
866         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
867         self.read_ops_counter.add(1)
868
869         if fh in self._filehandles:
870             handle = self._filehandles[fh]
871         else:
872             raise llfuse.FUSEError(errno.EBADF)
873
874         self.inodes.touch(handle.obj)
875
876         r = handle.obj.readfrom(off, size, self.num_retries)
877         if r:
878             self.read_counter.add(len(r))
879         return r
880
881     @write_time.time()
882     @catch_exceptions
883     def write(self, fh, off, buf):
884         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
885         self.write_ops_counter.add(1)
886
887         if fh in self._filehandles:
888             handle = self._filehandles[fh]
889         else:
890             raise llfuse.FUSEError(errno.EBADF)
891
892         if not handle.obj.writable():
893             raise llfuse.FUSEError(errno.EPERM)
894
895         self.inodes.touch(handle.obj)
896
897         w = handle.obj.writeto(off, buf, self.num_retries)
898         if w:
899             self.write_counter.add(w)
900         return w
901
902     @release_time.time()
903     @catch_exceptions
904     def release(self, fh):
905         if fh in self._filehandles:
906             _logger.debug("arv-mount release fh %i", fh)
907             try:
908                 self._filehandles[fh].flush()
909             except Exception:
910                 raise
911             finally:
912                 self._filehandles[fh].release()
913                 del self._filehandles[fh]
914         self.inodes.cap_cache()
915
916     def releasedir(self, fh):
917         self.release(fh)
918
919     @opendir_time.time()
920     @catch_exceptions
921     def opendir(self, inode, ctx=None):
922         _logger.debug("arv-mount opendir: inode %i", inode)
923
924         if inode in self.inodes:
925             p = self.inodes[inode]
926         else:
927             _logger.debug("arv-mount opendir: called with unknown or removed inode %i", inode)
928             raise llfuse.FUSEError(errno.ENOENT)
929
930         if not isinstance(p, Directory):
931             raise llfuse.FUSEError(errno.ENOTDIR)
932
933         fh = next(self._filehandles_counter)
934         if p.parent_inode in self.inodes:
935             parent = self.inodes[p.parent_inode]
936         else:
937             _logger.warning("arv-mount opendir: parent inode %i of %i is missing", p.parent_inode, inode)
938             raise llfuse.FUSEError(errno.EIO)
939
940         _logger.debug("arv-mount opendir: inode %i fh %i ", inode, fh)
941
942         # update atime
943         p.inc_use()
944         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + p.items())
945         p.dec_use()
946         self.inodes.touch(p)
947         return fh
948
949     @readdir_time.time()
950     @catch_exceptions
951     def readdir(self, fh, off):
952         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
953
954         if fh in self._filehandles:
955             handle = self._filehandles[fh]
956         else:
957             raise llfuse.FUSEError(errno.EBADF)
958
959         e = off
960         while e < len(handle.entries):
961             ent = handle.entries[e]
962             if ent[1].inode in self.inodes:
963                 yield (ent[0].encode(self.inodes.encoding), self.getattr(ent[1].inode), e+1)
964             e += 1
965
966     @statfs_time.time()
967     @catch_exceptions
968     def statfs(self, ctx=None):
969         st = llfuse.StatvfsData()
970         st.f_bsize = 128 * 1024
971         st.f_blocks = 0
972         st.f_files = 0
973
974         st.f_bfree = 0
975         st.f_bavail = 0
976
977         st.f_ffree = 0
978         st.f_favail = 0
979
980         st.f_frsize = 0
981         return st
982
983     def _check_writable(self, inode_parent):
984         if not self.enable_write:
985             raise llfuse.FUSEError(errno.EROFS)
986
987         if inode_parent in self.inodes:
988             p = self.inodes[inode_parent]
989         else:
990             raise llfuse.FUSEError(errno.ENOENT)
991
992         if not isinstance(p, Directory):
993             raise llfuse.FUSEError(errno.ENOTDIR)
994
995         if not p.writable():
996             raise llfuse.FUSEError(errno.EPERM)
997
998         return p
999
1000     @create_time.time()
1001     @catch_exceptions
1002     def create(self, inode_parent, name, mode, flags, ctx=None):
1003         name = name.decode(encoding=self.inodes.encoding)
1004         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
1005
1006         p = self._check_writable(inode_parent)
1007         p.create(name)
1008
1009         # The file entry should have been implicitly created by callback.
1010         f = p[name]
1011         fh = next(self._filehandles_counter)
1012         self._filehandles[fh] = FileHandle(fh, f)
1013         self.inodes.touch(p)
1014
1015         f.inc_ref()
1016         return (fh, self.getattr(f.inode))
1017
1018     @mkdir_time.time()
1019     @catch_exceptions
1020     def mkdir(self, inode_parent, name, mode, ctx=None):
1021         name = name.decode(encoding=self.inodes.encoding)
1022         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
1023
1024         p = self._check_writable(inode_parent)
1025         p.mkdir(name)
1026
1027         # The dir entry should have been implicitly created by callback.
1028         d = p[name]
1029
1030         d.inc_ref()
1031         return self.getattr(d.inode)
1032
1033     @unlink_time.time()
1034     @catch_exceptions
1035     def unlink(self, inode_parent, name, ctx=None):
1036         name = name.decode(encoding=self.inodes.encoding)
1037         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
1038         p = self._check_writable(inode_parent)
1039         p.unlink(name)
1040
1041     @rmdir_time.time()
1042     @catch_exceptions
1043     def rmdir(self, inode_parent, name, ctx=None):
1044         name = name.decode(encoding=self.inodes.encoding)
1045         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
1046         p = self._check_writable(inode_parent)
1047         p.rmdir(name)
1048
1049     @rename_time.time()
1050     @catch_exceptions
1051     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
1052         name_old = name_old.decode(encoding=self.inodes.encoding)
1053         name_new = name_new.decode(encoding=self.inodes.encoding)
1054         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
1055         src = self._check_writable(inode_parent_old)
1056         dest = self._check_writable(inode_parent_new)
1057         dest.rename(name_old, name_new, src)
1058
1059     @flush_time.time()
1060     @catch_exceptions
1061     def flush(self, fh):
1062         if fh in self._filehandles:
1063             self._filehandles[fh].flush()
1064
1065     def fsync(self, fh, datasync):
1066         self.flush(fh)
1067
1068     def fsyncdir(self, fh, datasync):
1069         self.flush(fh)