Merge branch '22000-deleting-favorites' into main. Closes #22000
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import apiclient
6 import arvados
7 import errno
8 import functools
9 import llfuse
10 import logging
11 import re
12 import sys
13 import threading
14 import time
15 from apiclient import errors as apiclient_errors
16
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
19
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
22
23 _logger = logging.getLogger('arvados.arvados_fuse')
24
25
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile(r'[\x00/]')
30
31
32 class Directory(FreshBase):
33     """Generic directory object, backed by a dict.
34
35     Consists of a set of entries with the key representing the filename
36     and the value referencing a File or Directory object.
37     """
38
39     __slots__ = ("inode", "parent_inode", "inodes", "_entries", "_mtime", "_enable_write", "_filters")
40
41     def __init__(self, parent_inode, inodes, enable_write, filters):
42         """parent_inode is the integer inode number"""
43
44         super(Directory, self).__init__()
45
46         self.inode = None
47         if not isinstance(parent_inode, int):
48             raise Exception("parent_inode should be an int")
49         self.parent_inode = parent_inode
50         self.inodes = inodes
51         self._entries = {}
52         self._mtime = time.time()
53         self._enable_write = enable_write
54         self._filters = filters or []
55
56     def _filters_for(self, subtype, *, qualified):
57         for f in self._filters:
58             f_type, _, f_name = f[0].partition('.')
59             if not f_name:
60                 yield f
61             elif f_type != subtype:
62                 pass
63             elif qualified:
64                 yield f
65             else:
66                 yield [f_name, *f[1:]]
67
68     def unsanitize_filename(self, incoming):
69         """Replace ForwardSlashNameSubstitution value with /"""
70         fsns = self.inodes.forward_slash_subst()
71         if isinstance(fsns, str):
72             return incoming.replace(fsns, '/')
73         else:
74             return incoming
75
76     def sanitize_filename(self, dirty):
77         """Replace disallowed filename characters according to
78         ForwardSlashNameSubstitution in self.api_config."""
79         # '.' and '..' are not reachable if API server is newer than #6277
80         if dirty is None:
81             return None
82         elif dirty == '':
83             return '_'
84         elif dirty == '.':
85             return '_'
86         elif dirty == '..':
87             return '__'
88         else:
89             fsns = self.inodes.forward_slash_subst()
90             if isinstance(fsns, str):
91                 dirty = dirty.replace('/', fsns)
92             return _disallowed_filename_characters.sub('_', dirty)
93
94
95     #  Overridden by subclasses to implement logic to update the
96     #  entries dict when the directory is stale
97     @use_counter
98     def update(self):
99         pass
100
101     # Only used when computing the size of the disk footprint of the directory
102     # (stub)
103     def size(self):
104         return 0
105
106     def persisted(self):
107         return False
108
109     def checkupdate(self):
110         if self.stale():
111             try:
112                 self.update()
113             except apiclient.errors.HttpError as e:
114                 _logger.warn(e)
115
116     @use_counter
117     @check_update
118     def __getitem__(self, item):
119         return self._entries[item]
120
121     @use_counter
122     @check_update
123     def items(self):
124         return list(self._entries.items())
125
126     @use_counter
127     @check_update
128     def __contains__(self, k):
129         return k in self._entries
130
131     @use_counter
132     @check_update
133     def __len__(self):
134         return len(self._entries)
135
136     def fresh(self):
137         self.inodes.touch(self)
138         super(Directory, self).fresh()
139
140     def objsize(self):
141         # Rough estimate of memory footprint based on using pympler
142         return len(self._entries) * 1024
143
144     def merge(self, items, fn, same, new_entry):
145         """Helper method for updating the contents of the directory.
146
147         Takes a list describing the new contents of the directory, reuse
148         entries that are the same in both the old and new lists, create new
149         entries, and delete old entries missing from the new list.
150
151         Arguments:
152         * items: Iterable --- New directory contents
153
154         * fn: Callable --- Takes an entry in 'items' and return the desired file or
155         directory name, or None if this entry should be skipped
156
157         * same: Callable --- Compare an existing entry (a File or Directory
158         object) with an entry in the items list to determine whether to keep
159         the existing entry.
160
161         * new_entry: Callable --- Create a new directory entry (File or Directory
162         object) from an entry in the items list.
163
164         """
165
166         oldentries = self._entries
167         self._entries = {}
168         changed = False
169         for i in items:
170             name = self.sanitize_filename(fn(i))
171             if not name:
172                 continue
173             if name in oldentries:
174                 ent = oldentries[name]
175                 if same(ent, i) and ent.parent_inode == self.inode:
176                     # move existing directory entry over
177                     self._entries[name] = ent
178                     del oldentries[name]
179                     self.inodes.inode_cache.touch(ent)
180
181         for i in items:
182             name = self.sanitize_filename(fn(i))
183             if not name:
184                 continue
185             if name not in self._entries:
186                 # create new directory entry
187                 ent = new_entry(i)
188                 if ent is not None:
189                     self._entries[name] = self.inodes.add_entry(ent)
190                     # need to invalidate this just in case there was a
191                     # previous entry that couldn't be moved over or a
192                     # lookup that returned file not found and cached
193                     # a negative result
194                     self.inodes.invalidate_entry(self, name)
195                     changed = True
196                 _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
197
198         # delete any other directory entries that were not in found in 'items'
199         for name, ent in oldentries.items():
200             _logger.debug("Detaching entry '%s' from parent_inode %i", name, self.inode)
201             self.inodes.invalidate_entry(self, name)
202             self.inodes.del_entry(ent)
203             changed = True
204
205         if changed:
206             self._mtime = time.time()
207             self.inodes.inode_cache.update_cache_size(self)
208
209         self.fresh()
210
211     def in_use(self):
212         if super(Directory, self).in_use():
213             return True
214         for v in self._entries.values():
215             if v.in_use():
216                 return True
217         return False
218
219     def clear(self):
220         """Delete all entries"""
221         if not self._entries:
222             return
223         oldentries = self._entries
224         self._entries = {}
225         self.invalidate()
226         for name, ent in oldentries.items():
227             ent.clear()
228             self.inodes.invalidate_entry(self, name)
229             self.inodes.del_entry(ent)
230         self.inodes.inode_cache.update_cache_size(self)
231
232     def kernel_invalidate(self):
233         # Invalidating the dentry on the parent implies invalidating all paths
234         # below it as well.
235         if self.parent_inode in self.inodes:
236             parent = self.inodes[self.parent_inode]
237         else:
238             # parent was removed already.
239             return
240
241         # Find self on the parent in order to invalidate this path.
242         # Calling the public items() method might trigger a refresh,
243         # which we definitely don't want, so read the internal dict directly.
244         for k,v in parent._entries.items():
245             if v is self:
246                 self.inodes.invalidate_entry(parent, k)
247                 break
248
249     def mtime(self):
250         return self._mtime
251
252     def writable(self):
253         return False
254
255     def flush(self):
256         pass
257
258     def want_event_subscribe(self):
259         raise NotImplementedError()
260
261     def create(self, name):
262         raise NotImplementedError()
263
264     def mkdir(self, name):
265         raise NotImplementedError()
266
267     def unlink(self, name):
268         raise NotImplementedError()
269
270     def rmdir(self, name):
271         raise NotImplementedError()
272
273     def rename(self, name_old, name_new, src):
274         raise NotImplementedError()
275
276
277 class CollectionDirectoryBase(Directory):
278     """Represent an Arvados Collection as a directory.
279
280     This class is used for Subcollections, and is also the base class for
281     CollectionDirectory, which implements collection loading/saving on
282     Collection records.
283
284     Most operations act only the underlying Arvados `Collection` object.  The
285     `Collection` object signals via a notify callback to
286     `CollectionDirectoryBase.on_event` that an item was added, removed or
287     modified.  FUSE inodes and directory entries are created, deleted or
288     invalidated in response to these events.
289
290     """
291
292     __slots__ = ("collection", "collection_root", "collection_record_file")
293
294     def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root):
295         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
296         self.collection = collection
297         self.collection_root = collection_root
298         self.collection_record_file = None
299
300     def new_entry(self, name, item, mtime):
301         name = self.sanitize_filename(name)
302         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
303             if item.fuse_entry.parent_inode is not None:
304                 raise Exception("Can only reparent unparented inode entry")
305             if item.fuse_entry.inode is None:
306                 raise Exception("Reparented entry must still have valid inode")
307             item.fuse_entry.parent_inode = self.inode
308             self._entries[name] = item.fuse_entry
309         elif isinstance(item, arvados.collection.RichCollectionBase):
310             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
311                 self.inode,
312                 self.inodes,
313                 self._enable_write,
314                 self._filters,
315                 item,
316                 self.collection_root,
317             ))
318             self._entries[name].populate(mtime)
319         else:
320             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
321         item.fuse_entry = self._entries[name]
322
323     def on_event(self, event, collection, name, item):
324         # These are events from the Collection object (ADD/DEL/MOD)
325         # emitted by operations on the Collection object (like
326         # "mkdirs" or "remove"), and by "update", which we need to
327         # synchronize with our FUSE objects that are assigned inodes.
328         if collection == self.collection:
329             name = self.sanitize_filename(name)
330
331             #
332             # It's possible for another thread to have llfuse.lock and
333             # be waiting on collection.lock.  Meanwhile, we released
334             # llfuse.lock earlier in the stack, but are still holding
335             # on to the collection lock, and now we need to re-acquire
336             # llfuse.lock.  If we don't release the collection lock,
337             # we'll deadlock where we're holding the collection lock
338             # waiting for llfuse.lock and the other thread is holding
339             # llfuse.lock and waiting for the collection lock.
340             #
341             # The correct locking order here is to take llfuse.lock
342             # first, then the collection lock.
343             #
344             # Since collection.lock is an RLock, it might be locked
345             # multiple times, so we need to release it multiple times,
346             # keep a count, then re-lock it the correct number of
347             # times.
348             #
349             lockcount = 0
350             try:
351                 while True:
352                     self.collection.lock.release()
353                     lockcount += 1
354             except RuntimeError:
355                 pass
356
357             try:
358                 with llfuse.lock:
359                     with self.collection.lock:
360                         if event == arvados.collection.ADD:
361                             self.new_entry(name, item, self.mtime())
362                         elif event == arvados.collection.DEL:
363                             ent = self._entries[name]
364                             del self._entries[name]
365                             self.inodes.invalidate_entry(self, name)
366                             self.inodes.del_entry(ent)
367                         elif event == arvados.collection.MOD:
368                             if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
369                                 self.inodes.invalidate_inode(item.fuse_entry)
370                             elif name in self._entries:
371                                 self.inodes.invalidate_inode(self._entries[name])
372
373                         if self.collection_record_file is not None:
374                             self.collection_record_file.invalidate()
375                             self.inodes.invalidate_inode(self.collection_record_file)
376             finally:
377                 while lockcount > 0:
378                     self.collection.lock.acquire()
379                     lockcount -= 1
380
381     def populate(self, mtime):
382         self._mtime = mtime
383         with self.collection.lock:
384             self.collection.subscribe(self.on_event)
385             for entry, item in self.collection.items():
386                 self.new_entry(entry, item, self.mtime())
387
388     def writable(self):
389         return self._enable_write and self.collection.writable()
390
391     @use_counter
392     def flush(self):
393         self.collection_root.flush()
394
395     @use_counter
396     @check_update
397     def create(self, name):
398         if not self.writable():
399             raise llfuse.FUSEError(errno.EROFS)
400         with llfuse.lock_released:
401             self.collection.open(name, "w").close()
402
403     @use_counter
404     @check_update
405     def mkdir(self, name):
406         if not self.writable():
407             raise llfuse.FUSEError(errno.EROFS)
408         with llfuse.lock_released:
409             self.collection.mkdirs(name)
410
411     @use_counter
412     @check_update
413     def unlink(self, name):
414         if not self.writable():
415             raise llfuse.FUSEError(errno.EROFS)
416         with llfuse.lock_released:
417             self.collection.remove(name)
418         self.flush()
419
420     @use_counter
421     @check_update
422     def rmdir(self, name):
423         if not self.writable():
424             raise llfuse.FUSEError(errno.EROFS)
425         with llfuse.lock_released:
426             self.collection.remove(name)
427         self.flush()
428
429     @use_counter
430     @check_update
431     def rename(self, name_old, name_new, src):
432         if not self.writable():
433             raise llfuse.FUSEError(errno.EROFS)
434
435         if not isinstance(src, CollectionDirectoryBase):
436             raise llfuse.FUSEError(errno.EPERM)
437
438         if name_new in self:
439             ent = src[name_old]
440             tgt = self[name_new]
441             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
442                 pass
443             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
444                 if len(tgt) > 0:
445                     raise llfuse.FUSEError(errno.ENOTEMPTY)
446             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
447                 raise llfuse.FUSEError(errno.ENOTDIR)
448             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
449                 raise llfuse.FUSEError(errno.EISDIR)
450
451         with llfuse.lock_released:
452             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
453         self.flush()
454         src.flush()
455
456     def clear(self):
457         super(CollectionDirectoryBase, self).clear()
458         if self.collection is not None:
459             self.collection.unsubscribe()
460         self.collection = None
461
462     def objsize(self):
463         # objsize for the whole collection is represented at the root,
464         # don't double-count it
465         return 0
466
467 class CollectionDirectory(CollectionDirectoryBase):
468     """Represents the root of a directory tree representing a collection."""
469
470     __slots__ = ("api", "num_retries", "collection_locator",
471                  "_manifest_size", "_writable", "_updating_lock")
472
473     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
474         super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
475         self.api = api
476         self.num_retries = num_retries
477         self._poll = True
478         try:
479             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
480         except:
481             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
482             self._poll_time = 60*60
483
484         if isinstance(collection_record, dict):
485             self.collection_locator = collection_record['uuid']
486             self._mtime = convertTime(collection_record.get('modified_at'))
487         else:
488             self.collection_locator = collection_record
489             self._mtime = 0
490         self._manifest_size = 0
491         if self.collection_locator:
492             self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
493         self._updating_lock = threading.Lock()
494
495     def same(self, i):
496         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
497
498     def writable(self):
499         return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
500
501     @use_counter
502     def flush(self):
503         if not self.writable():
504             return
505         with llfuse.lock_released:
506             with self._updating_lock:
507                 if self.collection.committed():
508                     self.collection.update()
509                 else:
510                     self.collection.save()
511                 self.new_collection_record(self.collection.api_response())
512
513     def want_event_subscribe(self):
514         return (uuid_pattern.match(self.collection_locator) is not None)
515
516     def new_collection(self, new_collection_record, coll_reader):
517         if self.inode:
518             self.clear()
519         self.collection = coll_reader
520         self.new_collection_record(new_collection_record)
521         self.populate(self.mtime())
522
523     def new_collection_record(self, new_collection_record):
524         if not new_collection_record:
525             raise Exception("invalid new_collection_record")
526         self._mtime = convertTime(new_collection_record.get('modified_at'))
527         self._manifest_size = len(new_collection_record["manifest_text"])
528         self.collection_locator = new_collection_record["uuid"]
529         if self.collection_record_file is not None:
530             self.collection_record_file.invalidate()
531             self.inodes.invalidate_inode(self.collection_record_file)
532             _logger.debug("parent_inode %s invalidated collection record file inode %s", self.inode,
533                           self.collection_record_file.inode)
534         self.inodes.update_uuid(self)
535         self.inodes.inode_cache.update_cache_size(self)
536         self.fresh()
537
538     def uuid(self):
539         return self.collection_locator
540
541     @use_counter
542     def update(self):
543         try:
544             if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator):
545                 # It's immutable, nothing to update
546                 return True
547
548             if self.collection_locator is None:
549                 # No collection locator to retrieve from
550                 self.fresh()
551                 return True
552
553             new_collection_record = None
554             try:
555                 with llfuse.lock_released:
556                     self._updating_lock.acquire()
557                     if not self.stale():
558                         return True
559
560                     _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
561                     coll_reader = None
562                     if self.collection is not None:
563                         # Already have a collection object
564                         self.collection.update()
565                         new_collection_record = self.collection.api_response()
566                     else:
567                         # Create a new collection object
568                         if uuid_pattern.match(self.collection_locator):
569                             coll_reader = arvados.collection.Collection(
570                                 self.collection_locator, self.api, self.api.keep,
571                                 num_retries=self.num_retries)
572                         else:
573                             coll_reader = arvados.collection.CollectionReader(
574                                 self.collection_locator, self.api, self.api.keep,
575                                 num_retries=self.num_retries)
576                         new_collection_record = coll_reader.api_response() or {}
577                         # If the Collection only exists in Keep, there will be no API
578                         # response.  Fill in the fields we need.
579                         if 'uuid' not in new_collection_record:
580                             new_collection_record['uuid'] = self.collection_locator
581                         if "portable_data_hash" not in new_collection_record:
582                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
583                         if 'manifest_text' not in new_collection_record:
584                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
585                         if 'storage_classes_desired' not in new_collection_record:
586                             new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
587
588                 # end with llfuse.lock_released, re-acquire lock
589
590                 if new_collection_record is not None:
591                     if coll_reader is not None:
592                         self.new_collection(new_collection_record, coll_reader)
593                     else:
594                         self.new_collection_record(new_collection_record)
595
596                 return True
597             finally:
598                 self._updating_lock.release()
599         except arvados.errors.NotFoundError as e:
600             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
601         except arvados.errors.ArgumentError as detail:
602             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
603             if new_collection_record is not None and "manifest_text" in new_collection_record:
604                 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
605         except Exception:
606             _logger.exception("arv-mount %s: error", self.collection_locator)
607             if new_collection_record is not None and "manifest_text" in new_collection_record:
608                 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
609         self.invalidate()
610         return False
611
612     @use_counter
613     @check_update
614     def collection_record(self):
615         self.flush()
616         return self.collection.api_response()
617
618     @use_counter
619     @check_update
620     def __getitem__(self, item):
621         if item == '.arvados#collection':
622             if self.collection_record_file is None:
623                 self.collection_record_file = FuncToJSONFile(
624                     self.inode, self.collection_record)
625                 self.inodes.add_entry(self.collection_record_file)
626             self.invalidate()  # use lookup as a signal to force update
627             return self.collection_record_file
628         else:
629             return super(CollectionDirectory, self).__getitem__(item)
630
631     def __contains__(self, k):
632         if k == '.arvados#collection':
633             return True
634         else:
635             return super(CollectionDirectory, self).__contains__(k)
636
637     def invalidate(self):
638         if self.collection_record_file is not None:
639             self.collection_record_file.invalidate()
640             self.inodes.invalidate_inode(self.collection_record_file)
641         super(CollectionDirectory, self).invalidate()
642
643     def persisted(self):
644         return (self.collection_locator is not None)
645
646     def objsize(self):
647         # This is a rough guess of the amount of overhead involved for
648         # a collection; the assumptions are that that each file
649         # averages 128 bytes in the manifest, but consume 1024 bytes
650         # of Python data structures, so 1024/128=8 means we estimate
651         # the RAM footprint at 8 times the size of bare manifest text.
652         return self._manifest_size * 8
653
654     def finalize(self):
655         if self.collection is None:
656             return
657
658         if self.writable():
659             try:
660                 self.collection.save()
661             except Exception as e:
662                 _logger.exception("Failed to save collection %s", self.collection_locator)
663         self.collection.stop_threads()
664
665     def clear(self):
666         if self.collection is not None:
667             self.collection.stop_threads()
668         self._manifest_size = 0
669         super(CollectionDirectory, self).clear()
670         if self.collection_record_file is not None:
671             self.inodes.del_entry(self.collection_record_file)
672         self.collection_record_file = None
673
674
675 class TmpCollectionDirectory(CollectionDirectoryBase):
676     """A directory backed by an Arvados collection that never gets saved.
677
678     This supports using Keep as scratch space. A userspace program can
679     read the .arvados#collection file to get a current manifest in
680     order to save a snapshot of the scratch data or use it as a crunch
681     job output.
682     """
683
684     class UnsaveableCollection(arvados.collection.Collection):
685         def save(self):
686             pass
687         def save_new(self):
688             pass
689
690     def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None):
691         collection = self.UnsaveableCollection(
692             api_client=api_client,
693             keep_client=api_client.keep,
694             num_retries=num_retries,
695             storage_classes_desired=storage_classes)
696         # This is always enable_write=True because it never tries to
697         # save to the backend
698         super(TmpCollectionDirectory, self).__init__(
699             parent_inode, inodes, True, filters, collection, self)
700         self.populate(self.mtime())
701
702     def on_event(self, *args, **kwargs):
703         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
704         if self.collection_record_file is None:
705             return
706
707         # See discussion in CollectionDirectoryBase.on_event
708         lockcount = 0
709         try:
710             while True:
711                 self.collection.lock.release()
712                 lockcount += 1
713         except RuntimeError:
714             pass
715
716         try:
717             with llfuse.lock:
718                 with self.collection.lock:
719                     self.collection_record_file.invalidate()
720                     self.inodes.invalidate_inode(self.collection_record_file)
721                     _logger.debug("%s invalidated collection record", self.inode)
722         finally:
723             while lockcount > 0:
724                 self.collection.lock.acquire()
725                 lockcount -= 1
726
727     def collection_record(self):
728         with llfuse.lock_released:
729             return {
730                 "uuid": None,
731                 "manifest_text": self.collection.manifest_text(),
732                 "portable_data_hash": self.collection.portable_data_hash(),
733                 "storage_classes_desired": self.collection.storage_classes_desired(),
734             }
735
736     def __contains__(self, k):
737         return (k == '.arvados#collection' or
738                 super(TmpCollectionDirectory, self).__contains__(k))
739
740     @use_counter
741     def __getitem__(self, item):
742         if item == '.arvados#collection':
743             if self.collection_record_file is None:
744                 self.collection_record_file = FuncToJSONFile(
745                     self.inode, self.collection_record)
746                 self.inodes.add_entry(self.collection_record_file)
747             return self.collection_record_file
748         return super(TmpCollectionDirectory, self).__getitem__(item)
749
750     def persisted(self):
751         return False
752
753     def writable(self):
754         return True
755
756     def flush(self):
757         pass
758
759     def want_event_subscribe(self):
760         return False
761
762     def finalize(self):
763         self.collection.stop_threads()
764
765     def invalidate(self):
766         if self.collection_record_file:
767             self.collection_record_file.invalidate()
768         super(TmpCollectionDirectory, self).invalidate()
769
770
771 class MagicDirectory(Directory):
772     """A special directory that logically contains the set of all extant keep locators.
773
774     When a file is referenced by lookup(), it is tested to see if it is a valid
775     keep locator to a manifest, and if so, loads the manifest contents as a
776     subdirectory of this directory with the locator as the directory name.
777     Since querying a list of all extant keep locators is impractical, only
778     collections that have already been accessed are visible to readdir().
779
780     """
781
782     README_TEXT = """
783 This directory provides access to Arvados collections as subdirectories listed
784 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
785 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
786 (in the form 'zzzzz-j7d0g-1234567890abcde').
787
788 Note that this directory will appear empty until you attempt to access a
789 specific collection or project subdirectory (such as trying to 'cd' into it),
790 at which point the collection or project will actually be looked up on the server
791 and the directory will appear if it exists.
792
793 """.lstrip()
794
795     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
796         super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
797         self.api = api
798         self.num_retries = num_retries
799         self.pdh_only = pdh_only
800         self.storage_classes = storage_classes
801
802     def __setattr__(self, name, value):
803         super(MagicDirectory, self).__setattr__(name, value)
804         # When we're assigned an inode, add a README.
805         if ((name == 'inode') and (self.inode is not None) and
806               (not self._entries)):
807             self._entries['README'] = self.inodes.add_entry(
808                 StringFile(self.inode, self.README_TEXT, time.time()))
809             # If we're the root directory, add an identical by_id subdirectory.
810             if self.inode == llfuse.ROOT_INODE:
811                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
812                     self.inode,
813                     self.inodes,
814                     self.api,
815                     self.num_retries,
816                     self._enable_write,
817                     self._filters,
818                     self.pdh_only,
819                 ))
820
821     def __contains__(self, k):
822         if k in self._entries:
823             return True
824
825         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
826             return False
827
828         try:
829             e = None
830
831             if group_uuid_pattern.match(k):
832                 project = self.api.groups().list(
833                     filters=[
834                         ['group_class', 'in', ['project','filter']],
835                         ["uuid", "=", k],
836                         *self._filters_for('groups', qualified=False),
837                     ],
838                 ).execute(num_retries=self.num_retries)
839                 if project[u'items_available'] == 0:
840                     return False
841                 e = self.inodes.add_entry(ProjectDirectory(
842                     self.inode,
843                     self.inodes,
844                     self.api,
845                     self.num_retries,
846                     self._enable_write,
847                     self._filters,
848                     project[u'items'][0],
849                     storage_classes=self.storage_classes,
850                 ))
851             else:
852                 e = self.inodes.add_entry(CollectionDirectory(
853                     self.inode,
854                     self.inodes,
855                     self.api,
856                     self.num_retries,
857                     self._enable_write,
858                     self._filters,
859                     k,
860                 ))
861
862             if e.update():
863                 if k not in self._entries:
864                     self._entries[k] = e
865                 else:
866                     self.inodes.del_entry(e)
867                 return True
868             else:
869                 self.inodes.invalidate_entry(self, k)
870                 self.inodes.del_entry(e)
871                 return False
872         except Exception as ex:
873             _logger.exception("arv-mount lookup '%s':", k)
874             if e is not None:
875                 self.inodes.del_entry(e)
876             return False
877
878     def __getitem__(self, item):
879         if item in self:
880             return self._entries[item]
881         else:
882             raise KeyError("No collection with id " + item)
883
884     def clear(self):
885         pass
886
887     def want_event_subscribe(self):
888         return not self.pdh_only
889
890
891 class TagsDirectory(Directory):
892     """A special directory that contains as subdirectories all tags visible to the user."""
893
894     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
895         super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
896         self.api = api
897         self.num_retries = num_retries
898         self._poll = True
899         self._poll_time = poll_time
900         self._extra = set()
901
902     def want_event_subscribe(self):
903         return True
904
905     @use_counter
906     def update(self):
907         with llfuse.lock_released:
908             tags = self.api.links().list(
909                 filters=[
910                     ['link_class', '=', 'tag'],
911                     ['name', '!=', ''],
912                     *self._filters_for('links', qualified=False),
913                 ],
914                 select=['name'],
915                 distinct=True,
916                 limit=1000,
917             ).execute(num_retries=self.num_retries)
918         if "items" in tags:
919             self.merge(
920                 tags['items']+[{"name": n} for n in self._extra],
921                 lambda i: i['name'],
922                 lambda a, i: a.tag == i['name'],
923                 lambda i: TagDirectory(
924                     self.inode,
925                     self.inodes,
926                     self.api,
927                     self.num_retries,
928                     self._enable_write,
929                     self._filters,
930                     i['name'],
931                     poll=self._poll,
932                     poll_time=self._poll_time,
933                 ),
934             )
935
936     @use_counter
937     @check_update
938     def __getitem__(self, item):
939         if super(TagsDirectory, self).__contains__(item):
940             return super(TagsDirectory, self).__getitem__(item)
941         with llfuse.lock_released:
942             tags = self.api.links().list(
943                 filters=[
944                     ['link_class', '=', 'tag'],
945                     ['name', '=', item],
946                     *self._filters_for('links', qualified=False),
947                 ],
948                 limit=1,
949             ).execute(num_retries=self.num_retries)
950         if tags["items"]:
951             self._extra.add(item)
952             self.update()
953         return super(TagsDirectory, self).__getitem__(item)
954
955     @use_counter
956     @check_update
957     def __contains__(self, k):
958         if super(TagsDirectory, self).__contains__(k):
959             return True
960         try:
961             self[k]
962             return True
963         except KeyError:
964             pass
965         return False
966
967
968 class TagDirectory(Directory):
969     """A special directory that contains as subdirectories all collections visible
970     to the user that are tagged with a particular tag.
971     """
972
973     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
974                  poll=False, poll_time=60):
975         super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
976         self.api = api
977         self.num_retries = num_retries
978         self.tag = tag
979         self._poll = poll
980         self._poll_time = poll_time
981
982     def want_event_subscribe(self):
983         return True
984
985     @use_counter
986     def update(self):
987         with llfuse.lock_released:
988             taggedcollections = self.api.links().list(
989                 filters=[
990                     ['link_class', '=', 'tag'],
991                     ['name', '=', self.tag],
992                     ['head_uuid', 'is_a', 'arvados#collection'],
993                     *self._filters_for('links', qualified=False),
994                 ],
995                 select=['head_uuid'],
996             ).execute(num_retries=self.num_retries)
997         self.merge(
998             taggedcollections['items'],
999             lambda i: i['head_uuid'],
1000             lambda a, i: a.collection_locator == i['head_uuid'],
1001             lambda i: CollectionDirectory(
1002                 self.inode,
1003                 self.inodes,
1004                 self.api,
1005                 self.num_retries,
1006                 self._enable_write,
1007                 self._filters,
1008                 i['head_uuid'],
1009             ),
1010         )
1011
1012
1013 class ProjectDirectory(Directory):
1014     """A special directory that contains the contents of a project."""
1015
1016     __slots__ = ("api", "num_retries", "project_object", "project_object_file",
1017                  "project_uuid", "_updating_lock",
1018                  "_current_user", "_full_listing", "storage_classes", "recursively_contained")
1019
1020     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1021                  project_object, poll=True, poll_time=3, storage_classes=None):
1022         super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1023         self.api = api
1024         self.num_retries = num_retries
1025         self.project_object = project_object
1026         self.project_object_file = None
1027         self.project_uuid = project_object['uuid']
1028         self._poll = poll
1029         self._poll_time = poll_time
1030         self._updating_lock = threading.Lock()
1031         self._current_user = None
1032         self._full_listing = False
1033         self.storage_classes = storage_classes
1034         self.recursively_contained = False
1035
1036         # Filter groups can contain themselves, which causes tools
1037         # that walk the filesystem to get stuck in an infinite loop,
1038         # so suppress returning a listing in that case.
1039         if self.project_object.get("group_class") == "filter":
1040             iter_parent_inode = parent_inode
1041             while iter_parent_inode != llfuse.ROOT_INODE:
1042                 parent_dir = self.inodes[iter_parent_inode]
1043                 if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid:
1044                     self.recursively_contained = True
1045                     break
1046                 iter_parent_inode = parent_dir.parent_inode
1047
1048     def want_event_subscribe(self):
1049         return True
1050
1051     def createDirectory(self, i):
1052         common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters)
1053         if collection_uuid_pattern.match(i['uuid']):
1054             return CollectionDirectory(*common_args, i)
1055         elif group_uuid_pattern.match(i['uuid']):
1056             return ProjectDirectory(*common_args, i, self._poll, self._poll_time, self.storage_classes)
1057         elif link_uuid_pattern.match(i['uuid']):
1058             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
1059                 return CollectionDirectory(*common_args, i['head_uuid'])
1060             else:
1061                 return None
1062         elif uuid_pattern.match(i['uuid']):
1063             return ObjectFile(self.parent_inode, i)
1064         else:
1065             return None
1066
1067     def uuid(self):
1068         return self.project_uuid
1069
1070     def items(self):
1071         self._full_listing = True
1072         return super(ProjectDirectory, self).items()
1073
1074     def namefn(self, i):
1075         if 'name' in i:
1076             if i['name'] is None or len(i['name']) == 0:
1077                 return None
1078             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
1079                 # collection or subproject
1080                 return i['name']
1081             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
1082                 # name link
1083                 return i['name']
1084             elif 'kind' in i and i['kind'].startswith('arvados#'):
1085                 # something else
1086                 return "{}.{}".format(i['name'], i['kind'][8:])
1087         else:
1088             return None
1089
1090
1091     @use_counter
1092     def update(self):
1093         if self.project_object_file == None:
1094             self.project_object_file = ObjectFile(self.inode, self.project_object)
1095             self.inodes.add_entry(self.project_object_file)
1096
1097         if self.recursively_contained or not self._full_listing:
1098             return True
1099
1100         def samefn(a, i):
1101             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
1102                 return a.uuid() == i['uuid']
1103             elif isinstance(a, ObjectFile):
1104                 return a.uuid() == i['uuid'] and not a.stale()
1105             return False
1106
1107         try:
1108             with llfuse.lock_released:
1109                 self._updating_lock.acquire()
1110                 if not self.stale():
1111                     return
1112
1113                 if group_uuid_pattern.match(self.project_uuid):
1114                     self.project_object = self.api.groups().get(
1115                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
1116                 elif user_uuid_pattern.match(self.project_uuid):
1117                     self.project_object = self.api.users().get(
1118                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
1119                 # do this in 2 steps until #17424 is fixed
1120                 contents = list(arvados.util.keyset_list_all(
1121                     self.api.groups().contents,
1122                     order_key='uuid',
1123                     num_retries=self.num_retries,
1124                     uuid=self.project_uuid,
1125                     filters=[
1126                         ['uuid', 'is_a', 'arvados#group'],
1127                         ['groups.group_class', 'in', ['project', 'filter']],
1128                         *self._filters_for('groups', qualified=True),
1129                     ],
1130                 ))
1131                 contents.extend(obj for obj in arvados.util.keyset_list_all(
1132                     self.api.groups().contents,
1133                     order_key='uuid',
1134                     num_retries=self.num_retries,
1135                     uuid=self.project_uuid,
1136                     filters=[
1137                         ['uuid', 'is_a', 'arvados#collection'],
1138                         *self._filters_for('collections', qualified=True),
1139                     ],
1140                 ) if obj['current_version_uuid'] == obj['uuid'])
1141             # end with llfuse.lock_released, re-acquire lock
1142
1143             self.merge(contents,
1144                        self.namefn,
1145                        samefn,
1146                        self.createDirectory)
1147             return True
1148         finally:
1149             self._updating_lock.release()
1150
1151     def _add_entry(self, i, name):
1152         ent = self.createDirectory(i)
1153         self._entries[name] = self.inodes.add_entry(ent)
1154         return self._entries[name]
1155
1156     @use_counter
1157     @check_update
1158     def __getitem__(self, k):
1159         if k == '.arvados#project':
1160             return self.project_object_file
1161         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1162             return super(ProjectDirectory, self).__getitem__(k)
1163         with llfuse.lock_released:
1164             k2 = self.unsanitize_filename(k)
1165             if k2 == k:
1166                 namefilter = ["name", "=", k]
1167             else:
1168                 namefilter = ["name", "in", [k, k2]]
1169             contents = self.api.groups().list(
1170                 filters=[
1171                     ["owner_uuid", "=", self.project_uuid],
1172                     ["group_class", "in", ["project","filter"]],
1173                     namefilter,
1174                     *self._filters_for('groups', qualified=False),
1175                 ],
1176                 limit=2,
1177             ).execute(num_retries=self.num_retries)["items"]
1178             if not contents:
1179                 contents = self.api.collections().list(
1180                     filters=[
1181                         ["owner_uuid", "=", self.project_uuid],
1182                         namefilter,
1183                         *self._filters_for('collections', qualified=False),
1184                     ],
1185                     limit=2,
1186                 ).execute(num_retries=self.num_retries)["items"]
1187         if contents:
1188             if len(contents) > 1 and contents[1]['name'] == k:
1189                 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1190                 # "foo[SUBST]bar".
1191                 contents = [contents[1]]
1192             name = self.sanitize_filename(self.namefn(contents[0]))
1193             if name != k:
1194                 raise KeyError(k)
1195             return self._add_entry(contents[0], name)
1196
1197         # Didn't find item
1198         raise KeyError(k)
1199
1200     def __contains__(self, k):
1201         if k == '.arvados#project':
1202             return True
1203         try:
1204             self[k]
1205             return True
1206         except KeyError:
1207             pass
1208         return False
1209
1210     @use_counter
1211     @check_update
1212     def writable(self):
1213         if not self._enable_write:
1214             return False
1215         with llfuse.lock_released:
1216             if not self._current_user:
1217                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1218             return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1219
1220     def persisted(self):
1221         return True
1222
1223     def clear(self):
1224         super(ProjectDirectory, self).clear()
1225         if self.project_object_file is not None:
1226             self.inodes.del_entry(self.project_object_file)
1227         self.project_object_file = None
1228
1229     @use_counter
1230     @check_update
1231     def mkdir(self, name):
1232         if not self.writable():
1233             raise llfuse.FUSEError(errno.EROFS)
1234
1235         try:
1236             with llfuse.lock_released:
1237                 c = {
1238                     "owner_uuid": self.project_uuid,
1239                     "name": name,
1240                     "manifest_text": "" }
1241                 if self.storage_classes is not None:
1242                     c["storage_classes_desired"] = self.storage_classes
1243                 try:
1244                     self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1245                 except Exception as e:
1246                     raise
1247             self.invalidate()
1248         except apiclient_errors.Error as error:
1249             _logger.error(error)
1250             raise llfuse.FUSEError(errno.EEXIST)
1251
1252     @use_counter
1253     @check_update
1254     def rmdir(self, name):
1255         if not self.writable():
1256             raise llfuse.FUSEError(errno.EROFS)
1257
1258         if name not in self:
1259             raise llfuse.FUSEError(errno.ENOENT)
1260         if not isinstance(self[name], CollectionDirectory):
1261             raise llfuse.FUSEError(errno.EPERM)
1262         if len(self[name]) > 0:
1263             raise llfuse.FUSEError(errno.ENOTEMPTY)
1264         with llfuse.lock_released:
1265             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1266         self.invalidate()
1267
1268     @use_counter
1269     @check_update
1270     def rename(self, name_old, name_new, src):
1271         if not self.writable():
1272             raise llfuse.FUSEError(errno.EROFS)
1273
1274         if not isinstance(src, ProjectDirectory):
1275             raise llfuse.FUSEError(errno.EPERM)
1276
1277         ent = src[name_old]
1278
1279         if not isinstance(ent, CollectionDirectory):
1280             raise llfuse.FUSEError(errno.EPERM)
1281
1282         if name_new in self:
1283             # POSIX semantics for replacing one directory with another is
1284             # tricky (the target directory must be empty, the operation must be
1285             # atomic which isn't possible with the Arvados API as of this
1286             # writing) so don't support that.
1287             raise llfuse.FUSEError(errno.EPERM)
1288
1289         self.api.collections().update(uuid=ent.uuid(),
1290                                       body={"owner_uuid": self.uuid(),
1291                                             "name": name_new}).execute(num_retries=self.num_retries)
1292
1293         # Acually move the entry from source directory to this directory.
1294         del src._entries[name_old]
1295         self._entries[name_new] = ent
1296         self.inodes.invalidate_entry(src, name_old)
1297
1298     @use_counter
1299     def child_event(self, ev):
1300         properties = ev.get("properties") or {}
1301         old_attrs = properties.get("old_attributes") or {}
1302         new_attrs = properties.get("new_attributes") or {}
1303         old_attrs["uuid"] = ev["object_uuid"]
1304         new_attrs["uuid"] = ev["object_uuid"]
1305         old_name = self.sanitize_filename(self.namefn(old_attrs))
1306         new_name = self.sanitize_filename(self.namefn(new_attrs))
1307
1308         # create events will have a new name, but not an old name
1309         # delete events will have an old name, but not a new name
1310         # update events will have an old and new name, and they may be same or different
1311         # if they are the same, an unrelated field changed and there is nothing to do.
1312
1313         if old_attrs.get("owner_uuid") != self.project_uuid:
1314             # Was moved from somewhere else, so don't try to remove entry.
1315             old_name = None
1316         if ev.get("object_owner_uuid") != self.project_uuid:
1317             # Was moved to somewhere else, so don't try to add entry
1318             new_name = None
1319
1320         if old_attrs.get("is_trashed"):
1321             # Was previously deleted
1322             old_name = None
1323         if new_attrs.get("is_trashed"):
1324             # Has been deleted
1325             new_name = None
1326
1327         if new_name != old_name:
1328             ent = None
1329             if old_name in self._entries:
1330                 ent = self._entries[old_name]
1331                 del self._entries[old_name]
1332                 self.inodes.invalidate_entry(self, old_name)
1333
1334             if new_name:
1335                 if ent is not None:
1336                     self._entries[new_name] = ent
1337                 else:
1338                     self._add_entry(new_attrs, new_name)
1339             elif ent is not None:
1340                 self.inodes.del_entry(ent)
1341
1342
1343 class SharedDirectory(Directory):
1344     """A special directory that represents users or groups who have shared projects with me."""
1345
1346     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1347                  exclude, poll=False, poll_time=60, storage_classes=None):
1348         super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1349         self.api = api
1350         self.num_retries = num_retries
1351         self.current_user = api.users().current().execute(num_retries=num_retries)
1352         self._poll = True
1353         self._poll_time = poll_time
1354         self._updating_lock = threading.Lock()
1355         self.storage_classes = storage_classes
1356
1357     @use_counter
1358     def update(self):
1359         try:
1360             with llfuse.lock_released:
1361                 self._updating_lock.acquire()
1362                 if not self.stale():
1363                     return
1364
1365                 contents = {}
1366                 roots = []
1367                 root_owners = set()
1368                 objects = {}
1369
1370                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1371                 if 'httpMethod' in methods.get('shared', {}):
1372                     page = []
1373                     while True:
1374                         resp = self.api.groups().shared(
1375                             filters=[
1376                                 ['group_class', 'in', ['project','filter']],
1377                                 *page,
1378                                 *self._filters_for('groups', qualified=False),
1379                             ],
1380                             order="uuid",
1381                             limit=10000,
1382                             count="none",
1383                             include="owner_uuid",
1384                         ).execute()
1385                         if not resp["items"]:
1386                             break
1387                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1388                         for r in resp["items"]:
1389                             objects[r["uuid"]] = r
1390                             roots.append(r["uuid"])
1391                         for r in resp["included"]:
1392                             objects[r["uuid"]] = r
1393                             root_owners.add(r["uuid"])
1394                 else:
1395                     all_projects = list(arvados.util.keyset_list_all(
1396                         self.api.groups().list,
1397                         order_key="uuid",
1398                         num_retries=self.num_retries,
1399                         filters=[
1400                             ['group_class', 'in', ['project','filter']],
1401                             *self._filters_for('groups', qualified=False),
1402                         ],
1403                         select=["uuid", "owner_uuid"],
1404                     ))
1405                     for ob in all_projects:
1406                         objects[ob['uuid']] = ob
1407
1408                     current_uuid = self.current_user['uuid']
1409                     for ob in all_projects:
1410                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1411                             roots.append(ob['uuid'])
1412                             root_owners.add(ob['owner_uuid'])
1413
1414                     lusers = arvados.util.keyset_list_all(
1415                         self.api.users().list,
1416                         order_key="uuid",
1417                         num_retries=self.num_retries,
1418                         filters=[
1419                             ['uuid', 'in', list(root_owners)],
1420                             *self._filters_for('users', qualified=False),
1421                         ],
1422                     )
1423                     lgroups = arvados.util.keyset_list_all(
1424                         self.api.groups().list,
1425                         order_key="uuid",
1426                         num_retries=self.num_retries,
1427                         filters=[
1428                             ['uuid', 'in', list(root_owners)+roots],
1429                             *self._filters_for('groups', qualified=False),
1430                         ],
1431                     )
1432                     for l in lusers:
1433                         objects[l["uuid"]] = l
1434                     for l in lgroups:
1435                         objects[l["uuid"]] = l
1436
1437                 for r in root_owners:
1438                     if r in objects:
1439                         obr = objects[r]
1440                         if obr.get("name"):
1441                             contents[obr["name"]] = obr
1442                         elif "first_name" in obr:
1443                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1444
1445                 for r in roots:
1446                     if r in objects:
1447                         obr = objects[r]
1448                         if obr['owner_uuid'] not in objects:
1449                             contents[obr["name"]] = obr
1450
1451             # end with llfuse.lock_released, re-acquire lock
1452
1453             self.merge(
1454                 contents.items(),
1455                 lambda i: i[0],
1456                 lambda a, i: a.uuid() == i[1]['uuid'],
1457                 lambda i: ProjectDirectory(
1458                     self.inode,
1459                     self.inodes,
1460                     self.api,
1461                     self.num_retries,
1462                     self._enable_write,
1463                     self._filters,
1464                     i[1],
1465                     poll=self._poll,
1466                     poll_time=self._poll_time,
1467                     storage_classes=self.storage_classes,
1468                 ),
1469             )
1470         except Exception:
1471             _logger.exception("arv-mount shared dir error")
1472         finally:
1473             self._updating_lock.release()
1474
1475     def want_event_subscribe(self):
1476         return True