]> git.arvados.org - arvados.git/blob - services/fuse/arvados_fuse/fusedir.py
22420: Apply the stale-then-update pattern to files
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import apiclient
6 import arvados
7 import errno
8 import functools
9 import llfuse
10 import logging
11 import re
12 import sys
13 import threading
14 import time
15 from apiclient import errors as apiclient_errors
16
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
19
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
22
23 _logger = logging.getLogger('arvados.arvados_fuse')
24
25
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile(r'[\x00/]')
30
31
32 class Directory(FreshBase):
33     """Generic directory object, backed by a dict.
34
35     Consists of a set of entries with the key representing the filename
36     and the value referencing a File or Directory object.
37     """
38
39     __slots__ = ("inode", "parent_inode", "inodes", "_entries", "_mtime", "_enable_write", "_filters")
40
41     def __init__(self, parent_inode, inodes, enable_write, filters):
42         """parent_inode is the integer inode number"""
43
44         super(Directory, self).__init__()
45
46         self.inode = None
47         if not isinstance(parent_inode, int):
48             raise Exception("parent_inode should be an int")
49         self.parent_inode = parent_inode
50         self.inodes = inodes
51         self._entries = {}
52         self._mtime = time.time()
53         self._enable_write = enable_write
54         self._filters = filters or []
55
56     def _filters_for(self, subtype, *, qualified):
57         for f in self._filters:
58             f_type, _, f_name = f[0].partition('.')
59             if not f_name:
60                 yield f
61             elif f_type != subtype:
62                 pass
63             elif qualified:
64                 yield f
65             else:
66                 yield [f_name, *f[1:]]
67
68     def unsanitize_filename(self, incoming):
69         """Replace ForwardSlashNameSubstitution value with /"""
70         fsns = self.inodes.forward_slash_subst()
71         if isinstance(fsns, str):
72             return incoming.replace(fsns, '/')
73         else:
74             return incoming
75
76     def sanitize_filename(self, dirty):
77         """Replace disallowed filename characters according to
78         ForwardSlashNameSubstitution in self.api_config."""
79         # '.' and '..' are not reachable if API server is newer than #6277
80         if dirty is None:
81             return None
82         elif dirty == '':
83             return '_'
84         elif dirty == '.':
85             return '_'
86         elif dirty == '..':
87             return '__'
88         else:
89             fsns = self.inodes.forward_slash_subst()
90             if isinstance(fsns, str):
91                 dirty = dirty.replace('/', fsns)
92             return _disallowed_filename_characters.sub('_', dirty)
93
94
95     #  Overridden by subclasses to implement logic to update the
96     #  entries dict when the directory is stale
97     @use_counter
98     def update(self):
99         pass
100
101     # Only used when computing the size of the disk footprint of the directory
102     # (stub)
103     def size(self):
104         return 0
105
106     def persisted(self):
107         return False
108
109     def checkupdate(self):
110         if self.stale():
111             try:
112                 self.update()
113             except apiclient.errors.HttpError as e:
114                 _logger.warn(e)
115
116     @use_counter
117     @check_update
118     def __getitem__(self, item):
119         return self._entries[item]
120
121     @use_counter
122     @check_update
123     def items(self):
124         return list(self._entries.items())
125
126     @use_counter
127     @check_update
128     def __contains__(self, k):
129         return k in self._entries
130
131     @use_counter
132     @check_update
133     def __len__(self):
134         return len(self._entries)
135
136     def fresh(self):
137         self.inodes.touch(self)
138         super(Directory, self).fresh()
139
140     def objsize(self):
141         # Rough estimate of memory footprint based on using pympler
142         return len(self._entries) * 1024
143
144     def merge(self, items, fn, same, new_entry):
145         """Helper method for updating the contents of the directory.
146
147         Takes a list describing the new contents of the directory, reuse
148         entries that are the same in both the old and new lists, create new
149         entries, and delete old entries missing from the new list.
150
151         Arguments:
152         * items: Iterable --- New directory contents
153
154         * fn: Callable --- Takes an entry in 'items' and return the desired file or
155         directory name, or None if this entry should be skipped
156
157         * same: Callable --- Compare an existing entry (a File or Directory
158         object) with an entry in the items list to determine whether to keep
159         the existing entry.
160
161         * new_entry: Callable --- Create a new directory entry (File or Directory
162         object) from an entry in the items list.
163
164         """
165
166         oldentries = self._entries
167         self._entries = {}
168         changed = False
169         for i in items:
170             name = self.sanitize_filename(fn(i))
171             if not name:
172                 continue
173             if name in oldentries:
174                 ent = oldentries[name]
175                 if same(ent, i) and ent.parent_inode == self.inode:
176                     # move existing directory entry over
177                     self._entries[name] = ent
178                     del oldentries[name]
179                     self.inodes.inode_cache.touch(ent)
180
181         for i in items:
182             name = self.sanitize_filename(fn(i))
183             if not name:
184                 continue
185             if name not in self._entries:
186                 # create new directory entry
187                 ent = new_entry(i)
188                 if ent is not None:
189                     self._entries[name] = self.inodes.add_entry(ent)
190                     # need to invalidate this just in case there was a
191                     # previous entry that couldn't be moved over or a
192                     # lookup that returned file not found and cached
193                     # a negative result
194                     self.inodes.invalidate_entry(self, name)
195                     changed = True
196                 _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
197
198         # delete any other directory entries that were not in found in 'items'
199         for name, ent in oldentries.items():
200             _logger.debug("Detaching entry '%s' from parent_inode %i", name, self.inode)
201             self.inodes.invalidate_entry(self, name)
202             self.inodes.del_entry(ent)
203             changed = True
204
205         if changed:
206             self._mtime = time.time()
207             self.inodes.inode_cache.update_cache_size(self)
208
209         self.fresh()
210
211     def in_use(self):
212         if super(Directory, self).in_use():
213             return True
214         for v in self._entries.values():
215             if v.in_use():
216                 return True
217         return False
218
219     def clear(self):
220         """Delete all entries"""
221         if not self._entries:
222             return
223         oldentries = self._entries
224         self._entries = {}
225         self.invalidate()
226         for name, ent in oldentries.items():
227             ent.clear()
228             self.inodes.invalidate_entry(self, name)
229             self.inodes.del_entry(ent)
230         self.inodes.inode_cache.update_cache_size(self)
231
232     def kernel_invalidate(self):
233         # Invalidating the dentry on the parent implies invalidating all paths
234         # below it as well.
235         if self.parent_inode in self.inodes:
236             parent = self.inodes[self.parent_inode]
237         else:
238             # parent was removed already.
239             return
240
241         # Find self on the parent in order to invalidate this path.
242         # Calling the public items() method might trigger a refresh,
243         # which we definitely don't want, so read the internal dict directly.
244         for k,v in parent._entries.items():
245             if v is self:
246                 self.inodes.invalidate_entry(parent, k)
247                 break
248
249     def mtime(self):
250         return self._mtime
251
252     def writable(self):
253         return False
254
255     def flush(self):
256         pass
257
258     def want_event_subscribe(self):
259         raise NotImplementedError()
260
261     def create(self, name):
262         raise NotImplementedError()
263
264     def mkdir(self, name):
265         raise NotImplementedError()
266
267     def unlink(self, name):
268         raise NotImplementedError()
269
270     def rmdir(self, name):
271         raise NotImplementedError()
272
273     def rename(self, name_old, name_new, src):
274         raise NotImplementedError()
275
276
277 class CollectionDirectoryBase(Directory):
278     """Represent an Arvados Collection as a directory.
279
280     This class is used for Subcollections, and is also the base class for
281     CollectionDirectory, which implements collection loading/saving on
282     Collection records.
283
284     Most operations act only the underlying Arvados `Collection` object.  The
285     `Collection` object signals via a notify callback to
286     `CollectionDirectoryBase.on_event` that an item was added, removed or
287     modified.  FUSE inodes and directory entries are created, deleted or
288     invalidated in response to these events.
289
290     """
291
292     __slots__ = ("collection", "collection_root", "collection_record_file")
293
294     def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root):
295         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
296         self.collection = collection
297         self.collection_root = collection_root
298         self.collection_record_file = None
299
300     def new_entry(self, name, item, mtime):
301         name = self.sanitize_filename(name)
302         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
303             if item.fuse_entry.parent_inode is not None:
304                 raise Exception("Can only reparent unparented inode entry")
305             if item.fuse_entry.inode is None:
306                 raise Exception("Reparented entry must still have valid inode")
307             item.fuse_entry.parent_inode = self.inode
308             self._entries[name] = item.fuse_entry
309         elif isinstance(item, arvados.collection.RichCollectionBase):
310             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
311                 self.inode,
312                 self.inodes,
313                 self._enable_write,
314                 self._filters,
315                 item,
316                 self.collection_root,
317             ))
318             self._entries[name].populate(mtime)
319         else:
320             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write, self._poll, self._poll_time))
321         item.fuse_entry = self._entries[name]
322
323     def on_event(self, event, collection, name, item):
324
325         # These are events from the Collection object (ADD/DEL/MOD)
326         # emitted by operations on the Collection object (like
327         # "mkdirs" or "remove"), and by "update", which we need to
328         # synchronize with our FUSE objects that are assigned inodes.
329         if collection != self.collection:
330             return
331
332         name = self.sanitize_filename(name)
333
334         #
335         # It's possible for another thread to have llfuse.lock and
336         # be waiting on collection.lock.  Meanwhile, we released
337         # llfuse.lock earlier in the stack, but are still holding
338         # on to the collection lock, and now we need to re-acquire
339         # llfuse.lock.  If we don't release the collection lock,
340         # we'll deadlock where we're holding the collection lock
341         # waiting for llfuse.lock and the other thread is holding
342         # llfuse.lock and waiting for the collection lock.
343         #
344         # The correct locking order here is to take llfuse.lock
345         # first, then the collection lock.
346         #
347         # Since collection.lock is an RLock, it might be locked
348         # multiple times, so we need to release it multiple times,
349         # keep a count, then re-lock it the correct number of
350         # times.
351         #
352         lockcount = 0
353         try:
354             while True:
355                 self.collection.lock.release()
356                 lockcount += 1
357         except RuntimeError:
358             pass
359
360         try:
361             with llfuse.lock:
362                 with self.collection.lock:
363                     if event == arvados.collection.ADD:
364                         self.new_entry(name, item, self.mtime())
365                     elif event == arvados.collection.DEL:
366                         ent = self._entries[name]
367                         del self._entries[name]
368                         self.inodes.invalidate_entry(self, name)
369                         self.inodes.del_entry(ent)
370                     elif event == arvados.collection.MOD:
371                         # MOD events have (modified_from, newitem)
372                         newitem = item[1]
373                         entry = None
374                         if hasattr(newitem, "fuse_entry") and newitem.fuse_entry is not None:
375                             entry = newitem.fuse_entry
376                         elif name in self._entries:
377                             entry = self._entries[name]
378
379                         if entry is not None:
380                             entry.invalidate()
381                             self.inodes.invalidate_inode(entry)
382
383                         if name in self._entries:
384                             self.inodes.invalidate_entry(self, name)
385
386                     # TOK and WRITE events just invalidate the
387                     # collection record file.
388
389                     if self.collection_record_file is not None:
390                         self.collection_record_file.invalidate()
391                         self.inodes.invalidate_inode(self.collection_record_file)
392         finally:
393             while lockcount > 0:
394                 self.collection.lock.acquire()
395                 lockcount -= 1
396
397     def populate(self, mtime):
398         self._mtime = mtime
399         with self.collection.lock:
400             self.collection.subscribe(self.on_event)
401             for entry, item in self.collection.items():
402                 self.new_entry(entry, item, self.mtime())
403
404     def writable(self):
405         return self._enable_write and self.collection.writable()
406
407     @use_counter
408     def flush(self):
409         self.collection_root.flush()
410
411     @use_counter
412     @check_update
413     def create(self, name):
414         if not self.writable():
415             raise llfuse.FUSEError(errno.EROFS)
416         with llfuse.lock_released:
417             self.collection.open(name, "w").close()
418
419     @use_counter
420     @check_update
421     def mkdir(self, name):
422         if not self.writable():
423             raise llfuse.FUSEError(errno.EROFS)
424         with llfuse.lock_released:
425             self.collection.mkdirs(name)
426
427     @use_counter
428     @check_update
429     def unlink(self, name):
430         if not self.writable():
431             raise llfuse.FUSEError(errno.EROFS)
432         with llfuse.lock_released:
433             self.collection.remove(name)
434         self.flush()
435
436     @use_counter
437     @check_update
438     def rmdir(self, name):
439         if not self.writable():
440             raise llfuse.FUSEError(errno.EROFS)
441         with llfuse.lock_released:
442             self.collection.remove(name)
443         self.flush()
444
445     @use_counter
446     @check_update
447     def rename(self, name_old, name_new, src):
448         if not self.writable():
449             raise llfuse.FUSEError(errno.EROFS)
450
451         if not isinstance(src, CollectionDirectoryBase):
452             raise llfuse.FUSEError(errno.EPERM)
453
454         if name_new in self:
455             ent = src[name_old]
456             tgt = self[name_new]
457             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
458                 pass
459             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
460                 if len(tgt) > 0:
461                     raise llfuse.FUSEError(errno.ENOTEMPTY)
462             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
463                 raise llfuse.FUSEError(errno.ENOTDIR)
464             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
465                 raise llfuse.FUSEError(errno.EISDIR)
466
467         with llfuse.lock_released:
468             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
469         self.flush()
470         src.flush()
471
472     def clear(self):
473         super(CollectionDirectoryBase, self).clear()
474         if self.collection is not None:
475             self.collection.unsubscribe()
476         self.collection = None
477
478     def objsize(self):
479         # objsize for the whole collection is represented at the root,
480         # don't double-count it
481         return 0
482
483 class CollectionDirectory(CollectionDirectoryBase):
484     """Represents the root of a directory tree representing a collection."""
485
486     __slots__ = ("api", "num_retries", "collection_locator",
487                  "_manifest_size", "_writable", "_updating_lock")
488
489     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
490         super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
491         self.api = api
492         self.num_retries = num_retries
493         self._poll = True
494
495         if isinstance(collection_record, dict):
496             self.collection_locator = collection_record['uuid']
497             self._mtime = convertTime(collection_record.get('modified_at'))
498         else:
499             self.collection_locator = collection_record
500             self._mtime = 0
501
502         is_uuid = (self.collection_locator is not None) and (uuid_pattern.match(self.collection_locator) is not None)
503
504         if is_uuid:
505             # It is a uuid, it may be updated upstream, so recheck it periodically.
506             self._poll_time = 15
507         else:
508             # It is not a uuid.  For immutable collections, collection
509             # only needs to be refreshed if it is very long lived
510             # (long enough that there's a risk of the blob signatures
511             # expiring).
512             try:
513                 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
514             except:
515                 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
516                 self._poll_time = 60*60
517
518         self._writable = is_uuid and enable_write
519         self._manifest_size = 0
520         self._updating_lock = threading.Lock()
521
522     def same(self, i):
523         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
524
525     def writable(self):
526         return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
527
528     @use_counter
529     def flush(self):
530         with llfuse.lock_released:
531             with self._updating_lock:
532                 if self.collection.committed():
533                     self.collection.update()
534                 else:
535                     self.collection.save()
536                 self.new_collection_record(self.collection.api_response())
537
538     def want_event_subscribe(self):
539         return (uuid_pattern.match(self.collection_locator) is not None)
540
541     def new_collection(self, new_collection_record, coll_reader):
542         if self.inode:
543             self.clear()
544         self.collection = coll_reader
545         self.new_collection_record(new_collection_record)
546         self.populate(self.mtime())
547
548     def new_collection_record(self, new_collection_record):
549         if not new_collection_record:
550             raise Exception("invalid new_collection_record")
551         self._mtime = convertTime(new_collection_record.get('modified_at'))
552         self._manifest_size = len(new_collection_record["manifest_text"])
553         self.collection_locator = new_collection_record["uuid"]
554         if self.collection_record_file is not None:
555             self.collection_record_file.invalidate()
556             self.inodes.invalidate_inode(self.collection_record_file)
557             _logger.debug("parent_inode %s invalidated collection record file inode %s", self.inode,
558                           self.collection_record_file.inode)
559         self.inodes.update_uuid(self)
560         self.inodes.inode_cache.update_cache_size(self)
561         self.fresh()
562
563     def uuid(self):
564         return self.collection_locator
565
566     @use_counter
567     def update(self):
568         try:
569             if self.collection_locator is None:
570                 # No collection locator to retrieve from
571                 self.fresh()
572                 return True
573
574             new_collection_record = None
575             try:
576                 with llfuse.lock_released:
577                     self._updating_lock.acquire()
578                     if not self.stale():
579                         return True
580
581                     _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
582                     coll_reader = None
583                     if self.collection is not None:
584                         # Already have a collection object
585                         self.collection.update()
586                         new_collection_record = self.collection.api_response()
587                     else:
588                         # Create a new collection object
589                         if uuid_pattern.match(self.collection_locator):
590                             coll_reader = arvados.collection.Collection(
591                                 self.collection_locator, self.api, self.api.keep,
592                                 num_retries=self.num_retries)
593                         else:
594                             coll_reader = arvados.collection.CollectionReader(
595                                 self.collection_locator, self.api, self.api.keep,
596                                 num_retries=self.num_retries)
597                         new_collection_record = coll_reader.api_response() or {}
598                         # If the Collection only exists in Keep, there will be no API
599                         # response.  Fill in the fields we need.
600                         if 'uuid' not in new_collection_record:
601                             new_collection_record['uuid'] = self.collection_locator
602                         if "portable_data_hash" not in new_collection_record:
603                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
604                         if 'manifest_text' not in new_collection_record:
605                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
606                         if 'storage_classes_desired' not in new_collection_record:
607                             new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
608
609                 # end with llfuse.lock_released, re-acquire lock
610
611                 if new_collection_record is not None:
612                     if coll_reader is not None:
613                         self.new_collection(new_collection_record, coll_reader)
614                     else:
615                         self.new_collection_record(new_collection_record)
616
617                 return True
618             finally:
619                 self._updating_lock.release()
620         except arvados.errors.NotFoundError as e:
621             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
622         except arvados.errors.ArgumentError as detail:
623             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
624             if new_collection_record is not None and "manifest_text" in new_collection_record:
625                 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
626         except Exception:
627             _logger.exception("arv-mount %s: error", self.collection_locator)
628             if new_collection_record is not None and "manifest_text" in new_collection_record:
629                 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
630         self.invalidate()
631         return False
632
633     @use_counter
634     @check_update
635     def collection_record(self):
636         self.flush()
637         return self.collection.api_response()
638
639     @use_counter
640     @check_update
641     def __getitem__(self, item):
642         if item == '.arvados#collection':
643             if self.collection_record_file is None:
644                 self.collection_record_file = FuncToJSONFile(
645                     self.inode, self.collection_record)
646                 self.inodes.add_entry(self.collection_record_file)
647             self.invalidate()  # use lookup as a signal to force update
648             return self.collection_record_file
649         else:
650             return super(CollectionDirectory, self).__getitem__(item)
651
652     def __contains__(self, k):
653         if k == '.arvados#collection':
654             return True
655         else:
656             return super(CollectionDirectory, self).__contains__(k)
657
658     def invalidate(self):
659         if self.collection_record_file is not None:
660             self.collection_record_file.invalidate()
661             self.inodes.invalidate_inode(self.collection_record_file)
662         super(CollectionDirectory, self).invalidate()
663
664     def persisted(self):
665         return (self.collection_locator is not None)
666
667     def objsize(self):
668         # This is a rough guess of the amount of overhead involved for
669         # a collection; the assumptions are that that each file
670         # averages 128 bytes in the manifest, but consume 1024 bytes
671         # of Python data structures, so 1024/128=8 means we estimate
672         # the RAM footprint at 8 times the size of bare manifest text.
673         return self._manifest_size * 8
674
675     def finalize(self):
676         if self.collection is None:
677             return
678
679         if self.writable():
680             try:
681                 self.collection.save()
682             except Exception as e:
683                 _logger.exception("Failed to save collection %s", self.collection_locator)
684         self.collection.stop_threads()
685
686     def clear(self):
687         if self.collection is not None:
688             self.collection.stop_threads()
689         self._manifest_size = 0
690         super(CollectionDirectory, self).clear()
691         if self.collection_record_file is not None:
692             self.inodes.del_entry(self.collection_record_file)
693         self.collection_record_file = None
694
695
696 class TmpCollectionDirectory(CollectionDirectoryBase):
697     """A directory backed by an Arvados collection that never gets saved.
698
699     This supports using Keep as scratch space. A userspace program can
700     read the .arvados#collection file to get a current manifest in
701     order to save a snapshot of the scratch data or use it as a crunch
702     job output.
703     """
704
705     class UnsaveableCollection(arvados.collection.Collection):
706         def save(self):
707             pass
708         def save_new(self):
709             pass
710
711     def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None):
712         collection = self.UnsaveableCollection(
713             api_client=api_client,
714             keep_client=api_client.keep,
715             num_retries=num_retries,
716             storage_classes_desired=storage_classes)
717         # This is always enable_write=True because it never tries to
718         # save to the backend
719         super(TmpCollectionDirectory, self).__init__(
720             parent_inode, inodes, True, filters, collection, self)
721         self.populate(self.mtime())
722
723     def collection_record(self):
724         with llfuse.lock_released:
725             return {
726                 "uuid": None,
727                 "manifest_text": self.collection.manifest_text(),
728                 "portable_data_hash": self.collection.portable_data_hash(),
729                 "storage_classes_desired": self.collection.storage_classes_desired(),
730             }
731
732     def __contains__(self, k):
733         return (k == '.arvados#collection' or
734                 super(TmpCollectionDirectory, self).__contains__(k))
735
736     @use_counter
737     def __getitem__(self, item):
738         if item == '.arvados#collection':
739             if self.collection_record_file is None:
740                 self.collection_record_file = FuncToJSONFile(
741                     self.inode, self.collection_record)
742                 self.inodes.add_entry(self.collection_record_file)
743             return self.collection_record_file
744         return super(TmpCollectionDirectory, self).__getitem__(item)
745
746     def persisted(self):
747         return False
748
749     def writable(self):
750         return True
751
752     def flush(self):
753         pass
754
755     def want_event_subscribe(self):
756         return False
757
758     def finalize(self):
759         self.collection.stop_threads()
760
761     def invalidate(self):
762         if self.collection_record_file:
763             self.collection_record_file.invalidate()
764         super(TmpCollectionDirectory, self).invalidate()
765
766
767 class MagicDirectory(Directory):
768     """A special directory that logically contains the set of all extant keep locators.
769
770     When a file is referenced by lookup(), it is tested to see if it is a valid
771     keep locator to a manifest, and if so, loads the manifest contents as a
772     subdirectory of this directory with the locator as the directory name.
773     Since querying a list of all extant keep locators is impractical, only
774     collections that have already been accessed are visible to readdir().
775
776     """
777
778     README_TEXT = """
779 This directory provides access to Arvados collections as subdirectories listed
780 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
781 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
782 (in the form 'zzzzz-j7d0g-1234567890abcde').
783
784 Note that this directory will appear empty until you attempt to access a
785 specific collection or project subdirectory (such as trying to 'cd' into it),
786 at which point the collection or project will actually be looked up on the server
787 and the directory will appear if it exists.
788
789 """.lstrip()
790
791     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
792         super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
793         self.api = api
794         self.num_retries = num_retries
795         self.pdh_only = pdh_only
796         self.storage_classes = storage_classes
797
798     def __setattr__(self, name, value):
799         super(MagicDirectory, self).__setattr__(name, value)
800         # When we're assigned an inode, add a README.
801         if ((name == 'inode') and (self.inode is not None) and
802               (not self._entries)):
803             self._entries['README'] = self.inodes.add_entry(
804                 StringFile(self.inode, self.README_TEXT, time.time()))
805             # If we're the root directory, add an identical by_id subdirectory.
806             if self.inode == llfuse.ROOT_INODE:
807                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
808                     self.inode,
809                     self.inodes,
810                     self.api,
811                     self.num_retries,
812                     self._enable_write,
813                     self._filters,
814                     self.pdh_only,
815                 ))
816
817     def __contains__(self, k):
818         if k in self._entries:
819             return True
820
821         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
822             return False
823
824         try:
825             e = None
826
827             if group_uuid_pattern.match(k):
828                 project = self.api.groups().list(
829                     filters=[
830                         ['group_class', 'in', ['project','filter']],
831                         ["uuid", "=", k],
832                         *self._filters_for('groups', qualified=False),
833                     ],
834                 ).execute(num_retries=self.num_retries)
835                 if project[u'items_available'] == 0:
836                     return False
837                 e = self.inodes.add_entry(ProjectDirectory(
838                     self.inode,
839                     self.inodes,
840                     self.api,
841                     self.num_retries,
842                     self._enable_write,
843                     self._filters,
844                     project[u'items'][0],
845                     storage_classes=self.storage_classes,
846                 ))
847             else:
848                 e = self.inodes.add_entry(CollectionDirectory(
849                     self.inode,
850                     self.inodes,
851                     self.api,
852                     self.num_retries,
853                     self._enable_write,
854                     self._filters,
855                     k,
856                 ))
857
858             if e.update():
859                 if k not in self._entries:
860                     self._entries[k] = e
861                 else:
862                     self.inodes.del_entry(e)
863                 return True
864             else:
865                 self.inodes.invalidate_entry(self, k)
866                 self.inodes.del_entry(e)
867                 return False
868         except Exception as ex:
869             _logger.exception("arv-mount lookup '%s':", k)
870             if e is not None:
871                 self.inodes.del_entry(e)
872             return False
873
874     def __getitem__(self, item):
875         if item in self:
876             return self._entries[item]
877         else:
878             raise KeyError("No collection with id " + item)
879
880     def clear(self):
881         pass
882
883     def want_event_subscribe(self):
884         return not self.pdh_only
885
886
887 class TagsDirectory(Directory):
888     """A special directory that contains as subdirectories all tags visible to the user."""
889
890     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
891         super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
892         self.api = api
893         self.num_retries = num_retries
894         self._poll = True
895         self._poll_time = poll_time
896         self._extra = set()
897
898     def want_event_subscribe(self):
899         return True
900
901     @use_counter
902     def update(self):
903         with llfuse.lock_released:
904             tags = self.api.links().list(
905                 filters=[
906                     ['link_class', '=', 'tag'],
907                     ['name', '!=', ''],
908                     *self._filters_for('links', qualified=False),
909                 ],
910                 select=['name'],
911                 distinct=True,
912                 limit=1000,
913             ).execute(num_retries=self.num_retries)
914         if "items" in tags:
915             self.merge(
916                 tags['items']+[{"name": n} for n in self._extra],
917                 lambda i: i['name'],
918                 lambda a, i: a.tag == i['name'],
919                 lambda i: TagDirectory(
920                     self.inode,
921                     self.inodes,
922                     self.api,
923                     self.num_retries,
924                     self._enable_write,
925                     self._filters,
926                     i['name'],
927                     poll=self._poll,
928                     poll_time=self._poll_time,
929                 ),
930             )
931
932     @use_counter
933     @check_update
934     def __getitem__(self, item):
935         if super(TagsDirectory, self).__contains__(item):
936             return super(TagsDirectory, self).__getitem__(item)
937         with llfuse.lock_released:
938             tags = self.api.links().list(
939                 filters=[
940                     ['link_class', '=', 'tag'],
941                     ['name', '=', item],
942                     *self._filters_for('links', qualified=False),
943                 ],
944                 limit=1,
945             ).execute(num_retries=self.num_retries)
946         if tags["items"]:
947             self._extra.add(item)
948             self.update()
949         return super(TagsDirectory, self).__getitem__(item)
950
951     @use_counter
952     @check_update
953     def __contains__(self, k):
954         if super(TagsDirectory, self).__contains__(k):
955             return True
956         try:
957             self[k]
958             return True
959         except KeyError:
960             pass
961         return False
962
963
964 class TagDirectory(Directory):
965     """A special directory that contains as subdirectories all collections visible
966     to the user that are tagged with a particular tag.
967     """
968
969     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
970                  poll=False, poll_time=60):
971         super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
972         self.api = api
973         self.num_retries = num_retries
974         self.tag = tag
975         self._poll = poll
976         self._poll_time = poll_time
977
978     def want_event_subscribe(self):
979         return True
980
981     @use_counter
982     def update(self):
983         with llfuse.lock_released:
984             taggedcollections = self.api.links().list(
985                 filters=[
986                     ['link_class', '=', 'tag'],
987                     ['name', '=', self.tag],
988                     ['head_uuid', 'is_a', 'arvados#collection'],
989                     *self._filters_for('links', qualified=False),
990                 ],
991                 select=['head_uuid'],
992             ).execute(num_retries=self.num_retries)
993         self.merge(
994             taggedcollections['items'],
995             lambda i: i['head_uuid'],
996             lambda a, i: a.collection_locator == i['head_uuid'],
997             lambda i: CollectionDirectory(
998                 self.inode,
999                 self.inodes,
1000                 self.api,
1001                 self.num_retries,
1002                 self._enable_write,
1003                 self._filters,
1004                 i['head_uuid'],
1005             ),
1006         )
1007
1008
1009 class ProjectDirectory(Directory):
1010     """A special directory that contains the contents of a project."""
1011
1012     __slots__ = ("api", "num_retries", "project_object", "project_object_file",
1013                  "project_uuid", "_updating_lock",
1014                  "_current_user", "_full_listing", "storage_classes", "recursively_contained")
1015
1016     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1017                  project_object, poll=True, poll_time=15, storage_classes=None):
1018         super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1019         self.api = api
1020         self.num_retries = num_retries
1021         self.project_object = project_object
1022         self.project_object_file = None
1023         self.project_uuid = project_object['uuid']
1024         self._poll = poll
1025         self._poll_time = poll_time
1026         self._updating_lock = threading.Lock()
1027         self._current_user = None
1028         self._full_listing = False
1029         self.storage_classes = storage_classes
1030         self.recursively_contained = False
1031
1032         # Filter groups can contain themselves, which causes tools
1033         # that walk the filesystem to get stuck in an infinite loop,
1034         # so suppress returning a listing in that case.
1035         if self.project_object.get("group_class") == "filter":
1036             iter_parent_inode = parent_inode
1037             while iter_parent_inode != llfuse.ROOT_INODE:
1038                 parent_dir = self.inodes[iter_parent_inode]
1039                 if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid:
1040                     self.recursively_contained = True
1041                     break
1042                 iter_parent_inode = parent_dir.parent_inode
1043
1044     def want_event_subscribe(self):
1045         return True
1046
1047     def createDirectory(self, i):
1048         common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters)
1049         if collection_uuid_pattern.match(i['uuid']):
1050             return CollectionDirectory(*common_args, i)
1051         elif group_uuid_pattern.match(i['uuid']):
1052             return ProjectDirectory(*common_args, i, self._poll, self._poll_time, self.storage_classes)
1053         elif link_uuid_pattern.match(i['uuid']):
1054             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
1055                 return CollectionDirectory(*common_args, i['head_uuid'])
1056             else:
1057                 return None
1058         elif uuid_pattern.match(i['uuid']):
1059             return ObjectFile(self.parent_inode, i)
1060         else:
1061             return None
1062
1063     def uuid(self):
1064         return self.project_uuid
1065
1066     def items(self):
1067         self._full_listing = True
1068         return super(ProjectDirectory, self).items()
1069
1070     def namefn(self, i):
1071         if 'name' in i:
1072             if i['name'] is None or len(i['name']) == 0:
1073                 return None
1074             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
1075                 # collection or subproject
1076                 return i['name']
1077             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
1078                 # name link
1079                 return i['name']
1080             elif 'kind' in i and i['kind'].startswith('arvados#'):
1081                 # something else
1082                 return "{}.{}".format(i['name'], i['kind'][8:])
1083         else:
1084             return None
1085
1086
1087     @use_counter
1088     def update(self):
1089         if self.project_object_file == None:
1090             self.project_object_file = ObjectFile(self.inode, self.project_object)
1091             self.inodes.add_entry(self.project_object_file)
1092
1093         if self.recursively_contained or not self._full_listing:
1094             return True
1095
1096         def samefn(a, i):
1097             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
1098                 return a.uuid() == i['uuid']
1099             elif isinstance(a, ObjectFile):
1100                 return a.uuid() == i['uuid'] and not a.stale()
1101             return False
1102
1103         try:
1104             with llfuse.lock_released:
1105                 self._updating_lock.acquire()
1106                 if not self.stale():
1107                     return
1108
1109                 if group_uuid_pattern.match(self.project_uuid):
1110                     self.project_object = self.api.groups().get(
1111                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
1112                 elif user_uuid_pattern.match(self.project_uuid):
1113                     self.project_object = self.api.users().get(
1114                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
1115                 # do this in 2 steps until #17424 is fixed
1116                 contents = list(arvados.util.keyset_list_all(
1117                     self.api.groups().contents,
1118                     order_key='uuid',
1119                     num_retries=self.num_retries,
1120                     uuid=self.project_uuid,
1121                     filters=[
1122                         ['uuid', 'is_a', 'arvados#group'],
1123                         ['groups.group_class', 'in', ['project', 'filter']],
1124                         *self._filters_for('groups', qualified=True),
1125                     ],
1126                 ))
1127                 contents.extend(obj for obj in arvados.util.keyset_list_all(
1128                     self.api.groups().contents,
1129                     order_key='uuid',
1130                     num_retries=self.num_retries,
1131                     uuid=self.project_uuid,
1132                     filters=[
1133                         ['uuid', 'is_a', 'arvados#collection'],
1134                         *self._filters_for('collections', qualified=True),
1135                     ],
1136                 ) if obj['current_version_uuid'] == obj['uuid'])
1137             # end with llfuse.lock_released, re-acquire lock
1138
1139             self.merge(contents,
1140                        self.namefn,
1141                        samefn,
1142                        self.createDirectory)
1143             return True
1144         finally:
1145             self._updating_lock.release()
1146
1147     def _add_entry(self, i, name):
1148         ent = self.createDirectory(i)
1149         self._entries[name] = self.inodes.add_entry(ent)
1150         return self._entries[name]
1151
1152     @use_counter
1153     @check_update
1154     def __getitem__(self, k):
1155         if k == '.arvados#project':
1156             return self.project_object_file
1157         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1158             return super(ProjectDirectory, self).__getitem__(k)
1159         with llfuse.lock_released:
1160             k2 = self.unsanitize_filename(k)
1161             if k2 == k:
1162                 namefilter = ["name", "=", k]
1163             else:
1164                 namefilter = ["name", "in", [k, k2]]
1165             contents = self.api.groups().list(
1166                 filters=[
1167                     ["owner_uuid", "=", self.project_uuid],
1168                     ["group_class", "in", ["project","filter"]],
1169                     namefilter,
1170                     *self._filters_for('groups', qualified=False),
1171                 ],
1172                 limit=2,
1173             ).execute(num_retries=self.num_retries)["items"]
1174             if not contents:
1175                 contents = self.api.collections().list(
1176                     filters=[
1177                         ["owner_uuid", "=", self.project_uuid],
1178                         namefilter,
1179                         *self._filters_for('collections', qualified=False),
1180                     ],
1181                     limit=2,
1182                 ).execute(num_retries=self.num_retries)["items"]
1183         if contents:
1184             if len(contents) > 1 and contents[1]['name'] == k:
1185                 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1186                 # "foo[SUBST]bar".
1187                 contents = [contents[1]]
1188             name = self.sanitize_filename(self.namefn(contents[0]))
1189             if name != k:
1190                 raise KeyError(k)
1191             return self._add_entry(contents[0], name)
1192
1193         # Didn't find item
1194         raise KeyError(k)
1195
1196     def __contains__(self, k):
1197         if k == '.arvados#project':
1198             return True
1199         try:
1200             self[k]
1201             return True
1202         except KeyError:
1203             pass
1204         return False
1205
1206     @use_counter
1207     @check_update
1208     def writable(self):
1209         if not self._enable_write:
1210             return False
1211         return self.project_object.get("can_write") is True
1212
1213     def persisted(self):
1214         return True
1215
1216     def clear(self):
1217         super(ProjectDirectory, self).clear()
1218         if self.project_object_file is not None:
1219             self.inodes.del_entry(self.project_object_file)
1220         self.project_object_file = None
1221
1222     @use_counter
1223     @check_update
1224     def mkdir(self, name):
1225         if not self.writable():
1226             raise llfuse.FUSEError(errno.EROFS)
1227
1228         try:
1229             with llfuse.lock_released:
1230                 c = {
1231                     "owner_uuid": self.project_uuid,
1232                     "name": name,
1233                     "manifest_text": "" }
1234                 if self.storage_classes is not None:
1235                     c["storage_classes_desired"] = self.storage_classes
1236                 try:
1237                     self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1238                 except Exception as e:
1239                     raise
1240             self.invalidate()
1241         except apiclient_errors.Error as error:
1242             _logger.error(error)
1243             raise llfuse.FUSEError(errno.EEXIST)
1244
1245     @use_counter
1246     @check_update
1247     def rmdir(self, name):
1248         if not self.writable():
1249             raise llfuse.FUSEError(errno.EROFS)
1250
1251         if name not in self:
1252             raise llfuse.FUSEError(errno.ENOENT)
1253         if not isinstance(self[name], CollectionDirectory):
1254             raise llfuse.FUSEError(errno.EPERM)
1255         if len(self[name]) > 0:
1256             raise llfuse.FUSEError(errno.ENOTEMPTY)
1257         with llfuse.lock_released:
1258             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1259         self.invalidate()
1260
1261     @use_counter
1262     @check_update
1263     def rename(self, name_old, name_new, src):
1264         if not self.writable():
1265             raise llfuse.FUSEError(errno.EROFS)
1266
1267         if not isinstance(src, ProjectDirectory):
1268             raise llfuse.FUSEError(errno.EPERM)
1269
1270         ent = src[name_old]
1271
1272         if not isinstance(ent, CollectionDirectory):
1273             raise llfuse.FUSEError(errno.EPERM)
1274
1275         if name_new in self:
1276             # POSIX semantics for replacing one directory with another is
1277             # tricky (the target directory must be empty, the operation must be
1278             # atomic which isn't possible with the Arvados API as of this
1279             # writing) so don't support that.
1280             raise llfuse.FUSEError(errno.EPERM)
1281
1282         self.api.collections().update(uuid=ent.uuid(),
1283                                       body={"owner_uuid": self.uuid(),
1284                                             "name": name_new}).execute(num_retries=self.num_retries)
1285
1286         # Acually move the entry from source directory to this directory.
1287         del src._entries[name_old]
1288         self._entries[name_new] = ent
1289         self.inodes.invalidate_entry(src, name_old)
1290
1291     @use_counter
1292     def child_event(self, ev):
1293         properties = ev.get("properties") or {}
1294         old_attrs = properties.get("old_attributes") or {}
1295         new_attrs = properties.get("new_attributes") or {}
1296         old_attrs["uuid"] = ev["object_uuid"]
1297         new_attrs["uuid"] = ev["object_uuid"]
1298         old_name = self.sanitize_filename(self.namefn(old_attrs))
1299         new_name = self.sanitize_filename(self.namefn(new_attrs))
1300
1301         # create events will have a new name, but not an old name
1302         # delete events will have an old name, but not a new name
1303         # update events will have an old and new name, and they may be same or different
1304         # if they are the same, an unrelated field changed and there is nothing to do.
1305
1306         if old_attrs.get("owner_uuid") != self.project_uuid:
1307             # Was moved from somewhere else, so don't try to remove entry.
1308             old_name = None
1309         if ev.get("object_owner_uuid") != self.project_uuid:
1310             # Was moved to somewhere else, so don't try to add entry
1311             new_name = None
1312
1313         if old_attrs.get("is_trashed"):
1314             # Was previously deleted
1315             old_name = None
1316         if new_attrs.get("is_trashed"):
1317             # Has been deleted
1318             new_name = None
1319
1320         if new_name != old_name:
1321             ent = None
1322             if old_name in self._entries:
1323                 ent = self._entries[old_name]
1324                 del self._entries[old_name]
1325                 self.inodes.invalidate_entry(self, old_name)
1326
1327             if new_name:
1328                 if ent is not None:
1329                     self._entries[new_name] = ent
1330                 else:
1331                     self._add_entry(new_attrs, new_name)
1332             elif ent is not None:
1333                 self.inodes.del_entry(ent)
1334
1335
1336 class SharedDirectory(Directory):
1337     """A special directory that represents users or groups who have shared projects with me."""
1338
1339     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1340                  exclude, poll=False, poll_time=60, storage_classes=None):
1341         super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1342         self.api = api
1343         self.num_retries = num_retries
1344         self.current_user = api.users().current().execute(num_retries=num_retries)
1345         self._poll = True
1346         self._poll_time = poll_time
1347         self._updating_lock = threading.Lock()
1348         self.storage_classes = storage_classes
1349
1350     @use_counter
1351     def update(self):
1352         try:
1353             with llfuse.lock_released:
1354                 self._updating_lock.acquire()
1355                 if not self.stale():
1356                     return
1357
1358                 contents = {}
1359                 roots = []
1360                 root_owners = set()
1361                 objects = {}
1362
1363                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1364                 if 'httpMethod' in methods.get('shared', {}):
1365                     page = []
1366                     while True:
1367                         resp = self.api.groups().shared(
1368                             filters=[
1369                                 ['group_class', 'in', ['project','filter']],
1370                                 *page,
1371                                 *self._filters_for('groups', qualified=False),
1372                             ],
1373                             order="uuid",
1374                             limit=10000,
1375                             count="none",
1376                             include="owner_uuid",
1377                         ).execute()
1378                         if not resp["items"]:
1379                             break
1380                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1381                         for r in resp["items"]:
1382                             objects[r["uuid"]] = r
1383                             roots.append(r["uuid"])
1384                         for r in resp["included"]:
1385                             objects[r["uuid"]] = r
1386                             root_owners.add(r["uuid"])
1387                 else:
1388                     all_projects = list(arvados.util.keyset_list_all(
1389                         self.api.groups().list,
1390                         order_key="uuid",
1391                         num_retries=self.num_retries,
1392                         filters=[
1393                             ['group_class', 'in', ['project','filter']],
1394                             *self._filters_for('groups', qualified=False),
1395                         ],
1396                         select=["uuid", "owner_uuid"],
1397                     ))
1398                     for ob in all_projects:
1399                         objects[ob['uuid']] = ob
1400
1401                     current_uuid = self.current_user['uuid']
1402                     for ob in all_projects:
1403                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1404                             roots.append(ob['uuid'])
1405                             root_owners.add(ob['owner_uuid'])
1406
1407                     lusers = arvados.util.keyset_list_all(
1408                         self.api.users().list,
1409                         order_key="uuid",
1410                         num_retries=self.num_retries,
1411                         filters=[
1412                             ['uuid', 'in', list(root_owners)],
1413                             *self._filters_for('users', qualified=False),
1414                         ],
1415                     )
1416                     lgroups = arvados.util.keyset_list_all(
1417                         self.api.groups().list,
1418                         order_key="uuid",
1419                         num_retries=self.num_retries,
1420                         filters=[
1421                             ['uuid', 'in', list(root_owners)+roots],
1422                             *self._filters_for('groups', qualified=False),
1423                         ],
1424                     )
1425                     for l in lusers:
1426                         objects[l["uuid"]] = l
1427                     for l in lgroups:
1428                         objects[l["uuid"]] = l
1429
1430                 for r in root_owners:
1431                     if r in objects:
1432                         obr = objects[r]
1433                         if obr.get("name"):
1434                             contents[obr["name"]] = obr
1435                         elif "first_name" in obr:
1436                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1437
1438                 for r in roots:
1439                     if r in objects:
1440                         obr = objects[r]
1441                         if obr['owner_uuid'] not in objects:
1442                             contents[obr["name"]] = obr
1443
1444             # end with llfuse.lock_released, re-acquire lock
1445
1446             self.merge(
1447                 contents.items(),
1448                 lambda i: i[0],
1449                 lambda a, i: a.uuid() == i[1]['uuid'],
1450                 lambda i: ProjectDirectory(
1451                     self.inode,
1452                     self.inodes,
1453                     self.api,
1454                     self.num_retries,
1455                     self._enable_write,
1456                     self._filters,
1457                     i[1],
1458                     poll=self._poll,
1459                     poll_time=self._poll_time,
1460                     storage_classes=self.storage_classes,
1461                 ),
1462             )
1463         except Exception:
1464             _logger.exception("arv-mount shared dir error")
1465         finally:
1466             self._updating_lock.release()
1467
1468     def want_event_subscribe(self):
1469         return True