21700: Install Bundler system-wide in Rails postinst
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 """FUSE driver for Arvados Keep
6
7 Architecture:
8
9 There is one `Operations` object per mount point.  It is the entry point for all
10 read and write requests from the llfuse module.
11
12 The operations object owns an `Inodes` object.  The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
15
16 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
17 memory footprint of file system objects and when they are last used.  When the
18 cache limit is exceeded, the least recently used objects are cleared.
19
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
21
22 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
24
25 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`.  These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
30 returing a result.
31
32 The general FUSE operation flow is as follows:
33
34 - The request handler is called with either an inode or file handle that is the
35   subject of the operation.
36
37 - Look up the inode using the Inodes table or the file handle in the
38   filehandles table to get the file system object.
39
40 - For methods that alter files or directories, check that the operation is
41   valid and permitted using _check_writable().
42
43 - Call the relevant method on the file system object.
44
45 - Return the result.
46
47 The FUSE driver supports the Arvados event bus.  When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
49
50 Implementation note: in the code, the terms 'object', 'entry' and
51 'inode' are used somewhat interchangeably, but generally mean an
52 arvados_fuse.File or arvados_fuse.Directory object which has numeric
53 inode assigned to it and appears in the Inodes._entries dictionary.
54
55 """
56
57 from __future__ import absolute_import
58 from __future__ import division
59 from builtins import next
60 from builtins import str
61 from builtins import object
62 import os
63 import llfuse
64 import errno
65 import stat
66 import threading
67 import arvados
68 import arvados.events
69 import logging
70 import time
71 import threading
72 import itertools
73 import collections
74 import functools
75 import arvados.keep
76 from prometheus_client import Summary
77 import queue
78 from dataclasses import dataclass
79 import typing
80
81 from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
82 from .fusefile import File, StringFile, FuseArvadosFile
83
84 _logger = logging.getLogger('arvados.arvados_fuse')
85
86 # Uncomment this to enable llfuse debug logging.
87 # log_handler = logging.StreamHandler()
88 # llogger = logging.getLogger('llfuse')
89 # llogger.addHandler(log_handler)
90 # llogger.setLevel(logging.DEBUG)
91
92 class Handle(object):
93     """Connects a numeric file handle to a File or Directory object that has
94     been opened by the client."""
95
96     def __init__(self, fh, obj):
97         self.fh = fh
98         self.obj = obj
99         self.obj.inc_use()
100
101     def release(self):
102         self.obj.dec_use()
103
104     def flush(self):
105         pass
106
107
108 class FileHandle(Handle):
109     """Connects a numeric file handle to a File  object that has
110     been opened by the client."""
111
112     def flush(self):
113         if self.obj.writable():
114             return self.obj.flush()
115
116
117 class DirectoryHandle(Handle):
118     """Connects a numeric file handle to a Directory object that has
119     been opened by the client.
120
121     DirectoryHandle is used by opendir() and readdir() to get
122     directory listings.  Entries returned by readdir() don't increment
123     the lookup count (kernel references), so increment our internal
124     "use count" to avoid having an item being removed mid-read.
125
126     """
127
128     def __init__(self, fh, dirobj, entries):
129         super(DirectoryHandle, self).__init__(fh, dirobj)
130         self.entries = entries
131
132         for ent in self.entries:
133             ent[1].inc_use()
134
135     def release(self):
136         for ent in self.entries:
137             ent[1].dec_use()
138         super(DirectoryHandle, self).release()
139
140
141 class InodeCache(object):
142     """Records the memory footprint of objects and when they are last used.
143
144     When the cache limit is exceeded, the least recently used objects
145     are cleared.  Clearing the object means discarding its contents to
146     release memory.  The next time the object is accessed, it must be
147     re-fetched from the server.  Note that the inode cache limit is a
148     soft limit; the cache limit may be exceeded if necessary to load
149     very large projects or collections, it may also be exceeded if an
150     inode can't be safely discarded based on kernel lookups
151     (has_ref()) or internal use count (in_use()).
152
153     """
154
155     def __init__(self, cap, min_entries=4):
156         # Standard dictionaries are ordered, but OrderedDict is still better here, see
157         # https://docs.python.org/3.11/library/collections.html#ordereddict-objects
158         # specifically we use move_to_end() which standard dicts don't have.
159         self._cache_entries = collections.OrderedDict()
160         self.cap = cap
161         self._total = 0
162         self.min_entries = min_entries
163
164     def total(self):
165         return self._total
166
167     def evict_candidates(self):
168         """Yield entries that are candidates to be evicted
169         and stop when the cache total has shrunk sufficiently.
170
171         Implements a LRU cache, when an item is added or touch()ed it
172         goes to the back of the OrderedDict, so items in the front are
173         oldest.  The Inodes._remove() function determines if the entry
174         can actually be removed safely.
175
176         """
177
178         if self._total <= self.cap:
179             return
180
181         _logger.debug("InodeCache evict_candidates total %i cap %i entries %i", self._total, self.cap, len(self._cache_entries))
182
183         # Copy this into a deque for two reasons:
184         #
185         # 1. _cache_entries is modified by unmanage() which is called
186         # by _remove
187         #
188         # 2. popping off the front means the reference goes away
189         # immediately intead of sticking around for the lifetime of
190         # "values"
191         values = collections.deque(self._cache_entries.values())
192
193         while values:
194             if self._total < self.cap or len(self._cache_entries) < self.min_entries:
195                 break
196             yield values.popleft()
197
198     def unmanage(self, entry):
199         """Stop managing an object in the cache.
200
201         This happens when an object is being removed from the inode
202         entries table.
203
204         """
205
206         if entry.inode not in self._cache_entries:
207             return
208
209         # manage cache size running sum
210         self._total -= entry.cache_size
211         entry.cache_size = 0
212
213         # Now forget about it
214         del self._cache_entries[entry.inode]
215
216     def update_cache_size(self, obj):
217         """Update the cache total in response to the footprint of an
218         object changing (usually because it has been loaded or
219         cleared).
220
221         Adds or removes entries to the cache list based on the object
222         cache size.
223
224         """
225
226         if not obj.persisted():
227             return
228
229         if obj.inode in self._cache_entries:
230             self._total -= obj.cache_size
231
232         obj.cache_size = obj.objsize()
233
234         if obj.cache_size > 0 or obj.parent_inode is None:
235             self._total += obj.cache_size
236             self._cache_entries[obj.inode] = obj
237         elif obj.cache_size == 0 and obj.inode in self._cache_entries:
238             del self._cache_entries[obj.inode]
239
240     def touch(self, obj):
241         """Indicate an object was used recently, making it low
242         priority to be removed from the cache.
243
244         """
245         if obj.inode in self._cache_entries:
246             self._cache_entries.move_to_end(obj.inode)
247             return True
248         return False
249
250     def clear(self):
251         self._cache_entries.clear()
252         self._total = 0
253
254 @dataclass
255 class RemoveInode:
256     entry: typing.Union[Directory, File]
257     def inode_op(self, inodes, locked_ops):
258         if locked_ops is None:
259             inodes._remove(self.entry)
260             return True
261         else:
262             locked_ops.append(self)
263             return False
264
265 @dataclass
266 class InvalidateInode:
267     inode: int
268     def inode_op(self, inodes, locked_ops):
269         llfuse.invalidate_inode(self.inode)
270         return True
271
272 @dataclass
273 class InvalidateEntry:
274     inode: int
275     name: str
276     def inode_op(self, inodes, locked_ops):
277         llfuse.invalidate_entry(self.inode, self.name)
278         return True
279
280 @dataclass
281 class EvictCandidates:
282     def inode_op(self, inodes, locked_ops):
283         return True
284
285
286 class Inodes(object):
287     """Manage the set of inodes.
288
289     This is the mapping from a numeric id to a concrete File or
290     Directory object
291
292     """
293
294     def __init__(self, inode_cache, encoding="utf-8", fsns=None, shutdown_started=None):
295         self._entries = {}
296         self._counter = itertools.count(llfuse.ROOT_INODE)
297         self.inode_cache = inode_cache
298         self.encoding = encoding
299         self._fsns = fsns
300         self._shutdown_started = shutdown_started or threading.Event()
301
302         self._inode_remove_queue = queue.Queue()
303         self._inode_remove_thread = threading.Thread(None, self._inode_remove)
304         self._inode_remove_thread.daemon = True
305         self._inode_remove_thread.start()
306
307         self.cap_cache_event = threading.Event()
308         self._by_uuid = collections.defaultdict(list)
309
310     def __getitem__(self, item):
311         return self._entries[item]
312
313     def __setitem__(self, key, item):
314         self._entries[key] = item
315
316     def __iter__(self):
317         return iter(self._entries.keys())
318
319     def items(self):
320         return self._entries.items()
321
322     def __contains__(self, k):
323         return k in self._entries
324
325     def touch(self, entry):
326         """Update the access time, adjust the cache position, and
327         notify the _inode_remove thread to recheck the cache.
328
329         """
330
331         entry._atime = time.time()
332         if self.inode_cache.touch(entry):
333             self.cap_cache()
334
335     def cap_cache(self):
336         """Notify the _inode_remove thread to recheck the cache."""
337         if not self.cap_cache_event.is_set():
338             self.cap_cache_event.set()
339             self._inode_remove_queue.put(EvictCandidates())
340
341     def update_uuid(self, entry):
342         """Update the Arvados uuid associated with an inode entry.
343
344         This is used to look up inodes that need to be invalidated
345         when a websocket event indicates the object has changed on the
346         API server.
347
348         """
349         if entry.cache_uuid and entry in self._by_uuid[entry.cache_uuid]:
350             self._by_uuid[entry.cache_uuid].remove(entry)
351
352         entry.cache_uuid = entry.uuid()
353         if entry.cache_uuid and entry not in self._by_uuid[entry.cache_uuid]:
354             self._by_uuid[entry.cache_uuid].append(entry)
355
356         if not self._by_uuid[entry.cache_uuid]:
357             del self._by_uuid[entry.cache_uuid]
358
359     def add_entry(self, entry):
360         """Assign a numeric inode to a new entry."""
361
362         entry.inode = next(self._counter)
363         if entry.inode == llfuse.ROOT_INODE:
364             entry.inc_ref()
365         self._entries[entry.inode] = entry
366
367         self.update_uuid(entry)
368         self.inode_cache.update_cache_size(entry)
369         self.cap_cache()
370         return entry
371
372     def del_entry(self, entry):
373         """Remove entry from the inode table.
374
375         Indicate this inode entry is pending deletion by setting
376         parent_inode to None.  Notify the _inode_remove thread to try
377         and remove it.
378
379         """
380
381         entry.parent_inode = None
382         self._inode_remove_queue.put(RemoveInode(entry))
383         _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
384
385     def _inode_remove(self):
386         """Background thread to handle tasks related to invalidating
387         inodes in the kernel, and removing objects from the inodes
388         table entirely.
389
390         """
391
392         locked_ops = collections.deque()
393         while True:
394             blocking_get = True
395             while True:
396                 try:
397                     qentry = self._inode_remove_queue.get(blocking_get)
398                 except queue.Empty:
399                     break
400                 blocking_get = False
401                 if qentry is None:
402                     return
403
404                 if self._shutdown_started.is_set():
405                     continue
406
407                 # Process this entry
408                 if qentry.inode_op(self, locked_ops):
409                     self._inode_remove_queue.task_done()
410
411                 # Give up the reference
412                 qentry = None
413
414             with llfuse.lock:
415                 while locked_ops:
416                     if locked_ops.popleft().inode_op(self, None):
417                         self._inode_remove_queue.task_done()
418                 self.cap_cache_event.clear()
419                 for entry in self.inode_cache.evict_candidates():
420                     self._remove(entry)
421
422     def wait_remove_queue_empty(self):
423         # used by tests
424         self._inode_remove_queue.join()
425
426     def _remove(self, entry):
427         """Remove an inode entry if possible.
428
429         If the entry is still referenced or in use, don't do anything.
430         If this is not referenced but the parent is still referenced,
431         clear any data held by the object (which may include directory
432         entries under the object) but don't remove it from the inode
433         table.
434
435         """
436         try:
437             if entry.inode is None:
438                 # Removed already
439                 return
440
441             if entry.inode == llfuse.ROOT_INODE:
442                 return
443
444             if entry.in_use():
445                 # referenced internally, stay pinned
446                 #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
447                 return
448
449             # Tell the kernel it should forget about it
450             entry.kernel_invalidate()
451
452             if entry.has_ref():
453                 # has kernel reference, could still be accessed.
454                 # when the kernel forgets about it, we can delete it.
455                 #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode)
456                 return
457
458             # commit any pending changes
459             with llfuse.lock_released:
460                 entry.finalize()
461
462             # Clear the contents
463             entry.clear()
464
465             if entry.parent_inode is None:
466                 _logger.debug("InodeCache forgetting inode %i, object cache_size %i, cache total %i, forget_inode True, inode entries %i, type %s",
467                               entry.inode, entry.cache_size, self.inode_cache.total(),
468                               len(self._entries), type(entry))
469
470                 if entry.cache_uuid:
471                     self._by_uuid[entry.cache_uuid].remove(entry)
472                     if not self._by_uuid[entry.cache_uuid]:
473                         del self._by_uuid[entry.cache_uuid]
474                     entry.cache_uuid = None
475
476                 self.inode_cache.unmanage(entry)
477
478                 del self._entries[entry.inode]
479                 entry.inode = None
480
481         except Exception as e:
482             _logger.exception("failed remove")
483
484     def invalidate_inode(self, entry):
485         if entry.has_ref():
486             # Only necessary if the kernel has previously done a lookup on this
487             # inode and hasn't yet forgotten about it.
488             self._inode_remove_queue.put(InvalidateInode(entry.inode))
489
490     def invalidate_entry(self, entry, name):
491         if entry.has_ref():
492             # Only necessary if the kernel has previously done a lookup on this
493             # inode and hasn't yet forgotten about it.
494             self._inode_remove_queue.put(InvalidateEntry(entry.inode, name.encode(self.encoding)))
495
496     def begin_shutdown(self):
497         self._inode_remove_queue.put(None)
498         if self._inode_remove_thread is not None:
499             self._inode_remove_thread.join()
500         self._inode_remove_thread = None
501
502     def clear(self):
503         with llfuse.lock_released:
504             self.begin_shutdown()
505
506         self.inode_cache.clear()
507         self._by_uuid.clear()
508
509         for k,v in self._entries.items():
510             try:
511                 v.finalize()
512             except Exception as e:
513                 _logger.exception("Error during finalize of inode %i", k)
514
515         self._entries.clear()
516
517     def forward_slash_subst(self):
518         return self._fsns
519
520     def find_by_uuid(self, uuid):
521         """Return a list of zero or more inode entries corresponding
522         to this Arvados UUID."""
523         return self._by_uuid.get(uuid, [])
524
525
526 def catch_exceptions(orig_func):
527     """Catch uncaught exceptions and log them consistently."""
528
529     @functools.wraps(orig_func)
530     def catch_exceptions_wrapper(self, *args, **kwargs):
531         try:
532             return orig_func(self, *args, **kwargs)
533         except llfuse.FUSEError:
534             raise
535         except EnvironmentError as e:
536             raise llfuse.FUSEError(e.errno)
537         except NotImplementedError:
538             raise llfuse.FUSEError(errno.ENOTSUP)
539         except arvados.errors.KeepWriteError as e:
540             _logger.error("Keep write error: " + str(e))
541             raise llfuse.FUSEError(errno.EIO)
542         except arvados.errors.NotFoundError as e:
543             _logger.error("Block not found error: " + str(e))
544             raise llfuse.FUSEError(errno.EIO)
545         except:
546             _logger.exception("Unhandled exception during FUSE operation")
547             raise llfuse.FUSEError(errno.EIO)
548
549     return catch_exceptions_wrapper
550
551
552 class Operations(llfuse.Operations):
553     """This is the main interface with llfuse.
554
555     The methods on this object are called by llfuse threads to service FUSE
556     events to query and read from the file system.
557
558     llfuse has its own global lock which is acquired before calling a request handler,
559     so request handlers do not run concurrently unless the lock is explicitly released
560     using 'with llfuse.lock_released:'
561
562     """
563
564     fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
565     read_time = fuse_time.labels(op='read')
566     write_time = fuse_time.labels(op='write')
567     destroy_time = fuse_time.labels(op='destroy')
568     on_event_time = fuse_time.labels(op='on_event')
569     getattr_time = fuse_time.labels(op='getattr')
570     setattr_time = fuse_time.labels(op='setattr')
571     lookup_time = fuse_time.labels(op='lookup')
572     forget_time = fuse_time.labels(op='forget')
573     open_time = fuse_time.labels(op='open')
574     release_time = fuse_time.labels(op='release')
575     opendir_time = fuse_time.labels(op='opendir')
576     readdir_time = fuse_time.labels(op='readdir')
577     statfs_time = fuse_time.labels(op='statfs')
578     create_time = fuse_time.labels(op='create')
579     mkdir_time = fuse_time.labels(op='mkdir')
580     unlink_time = fuse_time.labels(op='unlink')
581     rmdir_time = fuse_time.labels(op='rmdir')
582     rename_time = fuse_time.labels(op='rename')
583     flush_time = fuse_time.labels(op='flush')
584
585     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False, fsns=None):
586         super(Operations, self).__init__()
587
588         self._api_client = api_client
589
590         if not inode_cache:
591             inode_cache = InodeCache(cap=256*1024*1024)
592
593         if fsns is None:
594             try:
595                 fsns = self._api_client.config()["Collections"]["ForwardSlashNameSubstitution"]
596             except KeyError:
597                 # old API server with no FSNS config
598                 fsns = '_'
599             else:
600                 if fsns == '' or fsns == '/':
601                     fsns = None
602
603         # If we get overlapping shutdown events (e.g., fusermount -u
604         # -z and operations.destroy()) llfuse calls forget() on inodes
605         # that have already been deleted. To avoid this, we make
606         # forget() a no-op if called after destroy().
607         self._shutdown_started = threading.Event()
608
609         self.inodes = Inodes(inode_cache, encoding=encoding, fsns=fsns,
610                              shutdown_started=self._shutdown_started)
611         self.uid = uid
612         self.gid = gid
613         self.enable_write = enable_write
614
615         # dict of inode to filehandle
616         self._filehandles = {}
617         self._filehandles_counter = itertools.count(0)
618
619         # Other threads that need to wait until the fuse driver
620         # is fully initialized should wait() on this event object.
621         self.initlock = threading.Event()
622
623         self.num_retries = num_retries
624
625         self.read_counter = arvados.keep.Counter()
626         self.write_counter = arvados.keep.Counter()
627         self.read_ops_counter = arvados.keep.Counter()
628         self.write_ops_counter = arvados.keep.Counter()
629
630         self.events = None
631
632     def init(self):
633         # Allow threads that are waiting for the driver to be finished
634         # initializing to continue
635         self.initlock.set()
636
637     def metric_samples(self):
638         return self.fuse_time.collect()[0].samples
639
640     def metric_op_names(self):
641         ops = []
642         for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
643             if cur_op not in ops:
644                 ops.append(cur_op)
645         return ops
646
647     def metric_value(self, opname, metric):
648         op_value = [sample.value for sample in self.metric_samples()
649                     if sample.name == metric and sample.labels['op'] == opname]
650         return op_value[0] if len(op_value) == 1 else None
651
652     def metric_sum_func(self, opname):
653         return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
654
655     def metric_count_func(self, opname):
656         return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
657
658     def begin_shutdown(self):
659         self._shutdown_started.set()
660         self.inodes.begin_shutdown()
661
662     @destroy_time.time()
663     @catch_exceptions
664     def destroy(self):
665         _logger.debug("arv-mount destroy: start")
666
667         with llfuse.lock_released:
668             self.begin_shutdown()
669
670         if self.events:
671             self.events.close()
672             self.events = None
673
674         self.inodes.clear()
675
676         _logger.debug("arv-mount destroy: complete")
677
678
679     def access(self, inode, mode, ctx):
680         return True
681
682     def listen_for_events(self):
683         self.events = arvados.events.subscribe(
684             self._api_client,
685             [["event_type", "in", ["create", "update", "delete"]]],
686             self.on_event)
687
688     @on_event_time.time()
689     @catch_exceptions
690     def on_event(self, ev):
691         if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
692             return
693         with llfuse.lock:
694             properties = ev.get("properties") or {}
695             old_attrs = properties.get("old_attributes") or {}
696             new_attrs = properties.get("new_attributes") or {}
697
698             for item in self.inodes.find_by_uuid(ev["object_uuid"]):
699                 item.invalidate()
700
701             oldowner = old_attrs.get("owner_uuid")
702             newowner = ev.get("object_owner_uuid")
703             for parent in (
704                     self.inodes.find_by_uuid(oldowner) +
705                     self.inodes.find_by_uuid(newowner)):
706                 parent.invalidate()
707
708     @getattr_time.time()
709     @catch_exceptions
710     def getattr(self, inode, ctx=None):
711         if inode not in self.inodes:
712             _logger.debug("arv-mount getattr: inode %i missing", inode)
713             raise llfuse.FUSEError(errno.ENOENT)
714
715         e = self.inodes[inode]
716         self.inodes.touch(e)
717         parent = None
718         if e.parent_inode:
719             parent = self.inodes[e.parent_inode]
720             self.inodes.touch(parent)
721
722         entry = llfuse.EntryAttributes()
723         entry.st_ino = inode
724         entry.generation = 0
725         entry.entry_timeout = parent.time_to_next_poll() if parent is not None else 0
726         entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
727
728         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
729         if isinstance(e, Directory):
730             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
731         else:
732             entry.st_mode |= stat.S_IFREG
733             if isinstance(e, FuseArvadosFile):
734                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
735
736         if self.enable_write and e.writable():
737             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
738
739         entry.st_nlink = 1
740         entry.st_uid = self.uid
741         entry.st_gid = self.gid
742         entry.st_rdev = 0
743
744         entry.st_size = e.size()
745
746         entry.st_blksize = 512
747         entry.st_blocks = (entry.st_size // 512) + 1
748         if hasattr(entry, 'st_atime_ns'):
749             # llfuse >= 0.42
750             entry.st_atime_ns = int(e.atime() * 1000000000)
751             entry.st_mtime_ns = int(e.mtime() * 1000000000)
752             entry.st_ctime_ns = int(e.mtime() * 1000000000)
753         else:
754             # llfuse < 0.42
755             entry.st_atime = int(e.atime)
756             entry.st_mtime = int(e.mtime)
757             entry.st_ctime = int(e.mtime)
758
759         return entry
760
761     @setattr_time.time()
762     @catch_exceptions
763     def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
764         entry = self.getattr(inode)
765
766         if fh is not None and fh in self._filehandles:
767             handle = self._filehandles[fh]
768             e = handle.obj
769         else:
770             e = self.inodes[inode]
771
772         if fields is None:
773             # llfuse < 0.42
774             update_size = attr.st_size is not None
775         else:
776             # llfuse >= 0.42
777             update_size = fields.update_size
778         if update_size and isinstance(e, FuseArvadosFile):
779             with llfuse.lock_released:
780                 e.arvfile.truncate(attr.st_size)
781                 entry.st_size = e.arvfile.size()
782
783         return entry
784
785     @lookup_time.time()
786     @catch_exceptions
787     def lookup(self, parent_inode, name, ctx=None):
788         name = str(name, self.inodes.encoding)
789         inode = None
790
791         if name == '.':
792             inode = parent_inode
793         elif parent_inode in self.inodes:
794             p = self.inodes[parent_inode]
795             self.inodes.touch(p)
796             if name == '..':
797                 inode = p.parent_inode
798             elif isinstance(p, Directory) and name in p:
799                 if p[name].inode is None:
800                     _logger.debug("arv-mount lookup: parent_inode %i name '%s' found but inode was None",
801                                   parent_inode, name)
802                     raise llfuse.FUSEError(errno.ENOENT)
803
804                 inode = p[name].inode
805
806         if inode != None:
807             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
808                       parent_inode, name, inode)
809             self.inodes.touch(self.inodes[inode])
810             self.inodes[inode].inc_ref()
811             return self.getattr(inode)
812         else:
813             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
814                       parent_inode, name)
815             raise llfuse.FUSEError(errno.ENOENT)
816
817     @forget_time.time()
818     @catch_exceptions
819     def forget(self, inodes):
820         if self._shutdown_started.is_set():
821             return
822         for inode, nlookup in inodes:
823             ent = self.inodes[inode]
824             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
825             if ent.dec_ref(nlookup) == 0 and ent.parent_inode is None:
826                 self.inodes.del_entry(ent)
827
828     @open_time.time()
829     @catch_exceptions
830     def open(self, inode, flags, ctx=None):
831         if inode in self.inodes:
832             p = self.inodes[inode]
833         else:
834             _logger.debug("arv-mount open: inode %i missing", inode)
835             raise llfuse.FUSEError(errno.ENOENT)
836
837         if isinstance(p, Directory):
838             raise llfuse.FUSEError(errno.EISDIR)
839
840         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
841             raise llfuse.FUSEError(errno.EPERM)
842
843         fh = next(self._filehandles_counter)
844         self._filehandles[fh] = FileHandle(fh, p)
845         self.inodes.touch(p)
846
847         # Normally, we will have received an "update" event if the
848         # parent collection is stale here. However, even if the parent
849         # collection hasn't changed, the manifest might have been
850         # fetched so long ago that the signatures on the data block
851         # locators have expired. Calling checkupdate() on all
852         # ancestors ensures the signatures will be refreshed if
853         # necessary.
854         while p.parent_inode in self.inodes:
855             if p == self.inodes[p.parent_inode]:
856                 break
857             p = self.inodes[p.parent_inode]
858             self.inodes.touch(p)
859             p.checkupdate()
860
861         _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
862
863         return fh
864
865     @read_time.time()
866     @catch_exceptions
867     def read(self, fh, off, size):
868         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
869         self.read_ops_counter.add(1)
870
871         if fh in self._filehandles:
872             handle = self._filehandles[fh]
873         else:
874             raise llfuse.FUSEError(errno.EBADF)
875
876         self.inodes.touch(handle.obj)
877
878         r = handle.obj.readfrom(off, size, self.num_retries)
879         if r:
880             self.read_counter.add(len(r))
881         return r
882
883     @write_time.time()
884     @catch_exceptions
885     def write(self, fh, off, buf):
886         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
887         self.write_ops_counter.add(1)
888
889         if fh in self._filehandles:
890             handle = self._filehandles[fh]
891         else:
892             raise llfuse.FUSEError(errno.EBADF)
893
894         if not handle.obj.writable():
895             raise llfuse.FUSEError(errno.EPERM)
896
897         self.inodes.touch(handle.obj)
898
899         w = handle.obj.writeto(off, buf, self.num_retries)
900         if w:
901             self.write_counter.add(w)
902         return w
903
904     @release_time.time()
905     @catch_exceptions
906     def release(self, fh):
907         if fh in self._filehandles:
908             _logger.debug("arv-mount release fh %i", fh)
909             try:
910                 self._filehandles[fh].flush()
911             except Exception:
912                 raise
913             finally:
914                 self._filehandles[fh].release()
915                 del self._filehandles[fh]
916         self.inodes.cap_cache()
917
918     def releasedir(self, fh):
919         self.release(fh)
920
921     @opendir_time.time()
922     @catch_exceptions
923     def opendir(self, inode, ctx=None):
924         _logger.debug("arv-mount opendir: inode %i", inode)
925
926         if inode in self.inodes:
927             p = self.inodes[inode]
928         else:
929             _logger.debug("arv-mount opendir: called with unknown or removed inode %i", inode)
930             raise llfuse.FUSEError(errno.ENOENT)
931
932         if not isinstance(p, Directory):
933             raise llfuse.FUSEError(errno.ENOTDIR)
934
935         fh = next(self._filehandles_counter)
936         if p.parent_inode in self.inodes:
937             parent = self.inodes[p.parent_inode]
938         else:
939             _logger.warning("arv-mount opendir: parent inode %i of %i is missing", p.parent_inode, inode)
940             raise llfuse.FUSEError(errno.EIO)
941
942         _logger.debug("arv-mount opendir: inode %i fh %i ", inode, fh)
943
944         # update atime
945         p.inc_use()
946         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + p.items())
947         p.dec_use()
948         self.inodes.touch(p)
949         return fh
950
951     @readdir_time.time()
952     @catch_exceptions
953     def readdir(self, fh, off):
954         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
955
956         if fh in self._filehandles:
957             handle = self._filehandles[fh]
958         else:
959             raise llfuse.FUSEError(errno.EBADF)
960
961         e = off
962         while e < len(handle.entries):
963             ent = handle.entries[e]
964             if ent[1].inode in self.inodes:
965                 yield (ent[0].encode(self.inodes.encoding), self.getattr(ent[1].inode), e+1)
966             e += 1
967
968     @statfs_time.time()
969     @catch_exceptions
970     def statfs(self, ctx=None):
971         st = llfuse.StatvfsData()
972         st.f_bsize = 128 * 1024
973         st.f_blocks = 0
974         st.f_files = 0
975
976         st.f_bfree = 0
977         st.f_bavail = 0
978
979         st.f_ffree = 0
980         st.f_favail = 0
981
982         st.f_frsize = 0
983         return st
984
985     def _check_writable(self, inode_parent):
986         if not self.enable_write:
987             raise llfuse.FUSEError(errno.EROFS)
988
989         if inode_parent in self.inodes:
990             p = self.inodes[inode_parent]
991         else:
992             raise llfuse.FUSEError(errno.ENOENT)
993
994         if not isinstance(p, Directory):
995             raise llfuse.FUSEError(errno.ENOTDIR)
996
997         if not p.writable():
998             raise llfuse.FUSEError(errno.EPERM)
999
1000         return p
1001
1002     @create_time.time()
1003     @catch_exceptions
1004     def create(self, inode_parent, name, mode, flags, ctx=None):
1005         name = name.decode(encoding=self.inodes.encoding)
1006         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
1007
1008         p = self._check_writable(inode_parent)
1009         p.create(name)
1010
1011         # The file entry should have been implicitly created by callback.
1012         f = p[name]
1013         fh = next(self._filehandles_counter)
1014         self._filehandles[fh] = FileHandle(fh, f)
1015         self.inodes.touch(p)
1016
1017         f.inc_ref()
1018         return (fh, self.getattr(f.inode))
1019
1020     @mkdir_time.time()
1021     @catch_exceptions
1022     def mkdir(self, inode_parent, name, mode, ctx=None):
1023         name = name.decode(encoding=self.inodes.encoding)
1024         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
1025
1026         p = self._check_writable(inode_parent)
1027         p.mkdir(name)
1028
1029         # The dir entry should have been implicitly created by callback.
1030         d = p[name]
1031
1032         d.inc_ref()
1033         return self.getattr(d.inode)
1034
1035     @unlink_time.time()
1036     @catch_exceptions
1037     def unlink(self, inode_parent, name, ctx=None):
1038         name = name.decode(encoding=self.inodes.encoding)
1039         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
1040         p = self._check_writable(inode_parent)
1041         p.unlink(name)
1042
1043     @rmdir_time.time()
1044     @catch_exceptions
1045     def rmdir(self, inode_parent, name, ctx=None):
1046         name = name.decode(encoding=self.inodes.encoding)
1047         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
1048         p = self._check_writable(inode_parent)
1049         p.rmdir(name)
1050
1051     @rename_time.time()
1052     @catch_exceptions
1053     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
1054         name_old = name_old.decode(encoding=self.inodes.encoding)
1055         name_new = name_new.decode(encoding=self.inodes.encoding)
1056         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
1057         src = self._check_writable(inode_parent_old)
1058         dest = self._check_writable(inode_parent_new)
1059         dest.rename(name_old, name_new, src)
1060
1061     @flush_time.time()
1062     @catch_exceptions
1063     def flush(self, fh):
1064         if fh in self._filehandles:
1065             self._filehandles[fh].flush()
1066
1067     def fsync(self, fh, datasync):
1068         self.flush(fh)
1069
1070     def fsyncdir(self, fh, datasync):
1071         self.flush(fh)