]> git.arvados.org - arvados.git/blob - services/fuse/arvados_fuse/fusedir.py
22420: Correctly distinguish TOK and MOD events
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import apiclient
6 import arvados
7 import errno
8 import functools
9 import llfuse
10 import logging
11 import re
12 import sys
13 import threading
14 import time
15 from apiclient import errors as apiclient_errors
16
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
19
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
22
23 _logger = logging.getLogger('arvados.arvados_fuse')
24
25
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile(r'[\x00/]')
30
31
32 class Directory(FreshBase):
33     """Generic directory object, backed by a dict.
34
35     Consists of a set of entries with the key representing the filename
36     and the value referencing a File or Directory object.
37     """
38
39     __slots__ = ("inode", "parent_inode", "inodes", "_entries", "_mtime", "_enable_write", "_filters")
40
41     def __init__(self, parent_inode, inodes, enable_write, filters):
42         """parent_inode is the integer inode number"""
43
44         super(Directory, self).__init__()
45
46         self.inode = None
47         if not isinstance(parent_inode, int):
48             raise Exception("parent_inode should be an int")
49         self.parent_inode = parent_inode
50         self.inodes = inodes
51         self._entries = {}
52         self._mtime = time.time()
53         self._enable_write = enable_write
54         self._filters = filters or []
55
56     def _filters_for(self, subtype, *, qualified):
57         for f in self._filters:
58             f_type, _, f_name = f[0].partition('.')
59             if not f_name:
60                 yield f
61             elif f_type != subtype:
62                 pass
63             elif qualified:
64                 yield f
65             else:
66                 yield [f_name, *f[1:]]
67
68     def unsanitize_filename(self, incoming):
69         """Replace ForwardSlashNameSubstitution value with /"""
70         fsns = self.inodes.forward_slash_subst()
71         if isinstance(fsns, str):
72             return incoming.replace(fsns, '/')
73         else:
74             return incoming
75
76     def sanitize_filename(self, dirty):
77         """Replace disallowed filename characters according to
78         ForwardSlashNameSubstitution in self.api_config."""
79         # '.' and '..' are not reachable if API server is newer than #6277
80         if dirty is None:
81             return None
82         elif dirty == '':
83             return '_'
84         elif dirty == '.':
85             return '_'
86         elif dirty == '..':
87             return '__'
88         else:
89             fsns = self.inodes.forward_slash_subst()
90             if isinstance(fsns, str):
91                 dirty = dirty.replace('/', fsns)
92             return _disallowed_filename_characters.sub('_', dirty)
93
94
95     #  Overridden by subclasses to implement logic to update the
96     #  entries dict when the directory is stale
97     @use_counter
98     def update(self):
99         pass
100
101     # Only used when computing the size of the disk footprint of the directory
102     # (stub)
103     def size(self):
104         return 0
105
106     def persisted(self):
107         return False
108
109     def checkupdate(self):
110         if self.stale():
111             try:
112                 self.update()
113             except apiclient.errors.HttpError as e:
114                 _logger.warn(e)
115
116     @use_counter
117     @check_update
118     def __getitem__(self, item):
119         return self._entries[item]
120
121     @use_counter
122     @check_update
123     def items(self):
124         return list(self._entries.items())
125
126     @use_counter
127     @check_update
128     def __contains__(self, k):
129         return k in self._entries
130
131     @use_counter
132     @check_update
133     def __len__(self):
134         return len(self._entries)
135
136     def fresh(self):
137         self.inodes.touch(self)
138         super(Directory, self).fresh()
139
140     def objsize(self):
141         # Rough estimate of memory footprint based on using pympler
142         return len(self._entries) * 1024
143
144     def merge(self, items, fn, same, new_entry):
145         """Helper method for updating the contents of the directory.
146
147         Takes a list describing the new contents of the directory, reuse
148         entries that are the same in both the old and new lists, create new
149         entries, and delete old entries missing from the new list.
150
151         Arguments:
152         * items: Iterable --- New directory contents
153
154         * fn: Callable --- Takes an entry in 'items' and return the desired file or
155         directory name, or None if this entry should be skipped
156
157         * same: Callable --- Compare an existing entry (a File or Directory
158         object) with an entry in the items list to determine whether to keep
159         the existing entry.
160
161         * new_entry: Callable --- Create a new directory entry (File or Directory
162         object) from an entry in the items list.
163
164         """
165
166         oldentries = self._entries
167         self._entries = {}
168         changed = False
169         for i in items:
170             name = self.sanitize_filename(fn(i))
171             if not name:
172                 continue
173             if name in oldentries:
174                 ent = oldentries[name]
175                 if same(ent, i) and ent.parent_inode == self.inode:
176                     # move existing directory entry over
177                     self._entries[name] = ent
178                     del oldentries[name]
179                     self.inodes.inode_cache.touch(ent)
180
181         for i in items:
182             name = self.sanitize_filename(fn(i))
183             if not name:
184                 continue
185             if name not in self._entries:
186                 # create new directory entry
187                 ent = new_entry(i)
188                 if ent is not None:
189                     self._entries[name] = self.inodes.add_entry(ent)
190                     # need to invalidate this just in case there was a
191                     # previous entry that couldn't be moved over or a
192                     # lookup that returned file not found and cached
193                     # a negative result
194                     self.inodes.invalidate_entry(self, name)
195                     changed = True
196                 _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
197
198         # delete any other directory entries that were not in found in 'items'
199         for name, ent in oldentries.items():
200             _logger.debug("Detaching entry '%s' from parent_inode %i", name, self.inode)
201             self.inodes.invalidate_entry(self, name)
202             self.inodes.del_entry(ent)
203             changed = True
204
205         if changed:
206             self._mtime = time.time()
207             self.inodes.inode_cache.update_cache_size(self)
208
209         self.fresh()
210
211     def in_use(self):
212         if super(Directory, self).in_use():
213             return True
214         for v in self._entries.values():
215             if v.in_use():
216                 return True
217         return False
218
219     def clear(self):
220         """Delete all entries"""
221         if not self._entries:
222             return
223         oldentries = self._entries
224         self._entries = {}
225         self.invalidate()
226         for name, ent in oldentries.items():
227             ent.clear()
228             self.inodes.invalidate_entry(self, name)
229             self.inodes.del_entry(ent)
230         self.inodes.inode_cache.update_cache_size(self)
231
232     def kernel_invalidate(self):
233         # Invalidating the dentry on the parent implies invalidating all paths
234         # below it as well.
235         if self.parent_inode in self.inodes:
236             parent = self.inodes[self.parent_inode]
237         else:
238             # parent was removed already.
239             return
240
241         # Find self on the parent in order to invalidate this path.
242         # Calling the public items() method might trigger a refresh,
243         # which we definitely don't want, so read the internal dict directly.
244         for k,v in parent._entries.items():
245             if v is self:
246                 self.inodes.invalidate_entry(parent, k)
247                 break
248
249     def mtime(self):
250         return self._mtime
251
252     def writable(self):
253         return False
254
255     def flush(self):
256         pass
257
258     def want_event_subscribe(self):
259         raise NotImplementedError()
260
261     def create(self, name):
262         raise NotImplementedError()
263
264     def mkdir(self, name):
265         raise NotImplementedError()
266
267     def unlink(self, name):
268         raise NotImplementedError()
269
270     def rmdir(self, name):
271         raise NotImplementedError()
272
273     def rename(self, name_old, name_new, src):
274         raise NotImplementedError()
275
276
277 class CollectionDirectoryBase(Directory):
278     """Represent an Arvados Collection as a directory.
279
280     This class is used for Subcollections, and is also the base class for
281     CollectionDirectory, which implements collection loading/saving on
282     Collection records.
283
284     Most operations act only the underlying Arvados `Collection` object.  The
285     `Collection` object signals via a notify callback to
286     `CollectionDirectoryBase.on_event` that an item was added, removed or
287     modified.  FUSE inodes and directory entries are created, deleted or
288     invalidated in response to these events.
289
290     """
291
292     __slots__ = ("collection", "collection_root", "collection_record_file")
293
294     def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root):
295         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
296         self.collection = collection
297         self.collection_root = collection_root
298         self.collection_record_file = None
299
300     def new_entry(self, name, item, mtime):
301         name = self.sanitize_filename(name)
302         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
303             if item.fuse_entry.parent_inode is not None:
304                 raise Exception("Can only reparent unparented inode entry")
305             if item.fuse_entry.inode is None:
306                 raise Exception("Reparented entry must still have valid inode")
307             item.fuse_entry.parent_inode = self.inode
308             self._entries[name] = item.fuse_entry
309         elif isinstance(item, arvados.collection.RichCollectionBase):
310             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
311                 self.inode,
312                 self.inodes,
313                 self._enable_write,
314                 self._filters,
315                 item,
316                 self.collection_root,
317             ))
318             self._entries[name].populate(mtime)
319         else:
320             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write, self._poll, self._poll_time))
321         item.fuse_entry = self._entries[name]
322
323     def on_event(self, event, collection, name, item):
324         if event in (arvados.collection.TOK, arvados.collection.WRITE):
325             # We don't care about TOK events, that means only token
326             # signatures were updated, and WRITE events were initiated
327             # locally.
328             return
329
330         # These are events from the Collection object (ADD/DEL/MOD)
331         # emitted by operations on the Collection object (like
332         # "mkdirs" or "remove"), and by "update", which we need to
333         # synchronize with our FUSE objects that are assigned inodes.
334         if collection != self.collection:
335             return
336
337         name = self.sanitize_filename(name)
338
339         #
340         # It's possible for another thread to have llfuse.lock and
341         # be waiting on collection.lock.  Meanwhile, we released
342         # llfuse.lock earlier in the stack, but are still holding
343         # on to the collection lock, and now we need to re-acquire
344         # llfuse.lock.  If we don't release the collection lock,
345         # we'll deadlock where we're holding the collection lock
346         # waiting for llfuse.lock and the other thread is holding
347         # llfuse.lock and waiting for the collection lock.
348         #
349         # The correct locking order here is to take llfuse.lock
350         # first, then the collection lock.
351         #
352         # Since collection.lock is an RLock, it might be locked
353         # multiple times, so we need to release it multiple times,
354         # keep a count, then re-lock it the correct number of
355         # times.
356         #
357         lockcount = 0
358         try:
359             while True:
360                 self.collection.lock.release()
361                 lockcount += 1
362         except RuntimeError:
363             pass
364
365         try:
366             with llfuse.lock:
367                 with self.collection.lock:
368                     if event == arvados.collection.ADD:
369                         self.new_entry(name, item, self.mtime())
370                     elif event == arvados.collection.DEL:
371                         ent = self._entries[name]
372                         del self._entries[name]
373                         self.inodes.invalidate_entry(self, name)
374                         self.inodes.del_entry(ent)
375                     elif event == arvados.collection.MOD:
376                         # MOD events have (modified_from, newitem)
377                         newitem = item[1]
378                         entry = None
379                         if hasattr(newitem, "fuse_entry") and newitem.fuse_entry is not None:
380                             entry = newitem.fuse_entry
381                         elif name in self._entries:
382                             entry = self._entries[name]
383
384                         if entry is not None:
385                             entry.invalidate()
386                             self.inodes.invalidate_inode(entry)
387
388                         if name in self._entries:
389                             self.inodes.invalidate_entry(self, name)
390
391                     # we don't care about TOK events, those mean
392                     # only token signatures were updated
393
394                     if self.collection_record_file is not None:
395                         self.collection_record_file.invalidate()
396                         self.inodes.invalidate_inode(self.collection_record_file)
397         finally:
398             while lockcount > 0:
399                 self.collection.lock.acquire()
400                 lockcount -= 1
401
402     def populate(self, mtime):
403         self._mtime = mtime
404         with self.collection.lock:
405             self.collection.subscribe(self.on_event)
406             for entry, item in self.collection.items():
407                 self.new_entry(entry, item, self.mtime())
408
409     def writable(self):
410         return self._enable_write and self.collection.writable()
411
412     @use_counter
413     def flush(self):
414         self.collection_root.flush()
415
416     @use_counter
417     @check_update
418     def create(self, name):
419         if not self.writable():
420             raise llfuse.FUSEError(errno.EROFS)
421         with llfuse.lock_released:
422             self.collection.open(name, "w").close()
423
424     @use_counter
425     @check_update
426     def mkdir(self, name):
427         if not self.writable():
428             raise llfuse.FUSEError(errno.EROFS)
429         with llfuse.lock_released:
430             self.collection.mkdirs(name)
431
432     @use_counter
433     @check_update
434     def unlink(self, name):
435         if not self.writable():
436             raise llfuse.FUSEError(errno.EROFS)
437         with llfuse.lock_released:
438             self.collection.remove(name)
439         self.flush()
440
441     @use_counter
442     @check_update
443     def rmdir(self, name):
444         if not self.writable():
445             raise llfuse.FUSEError(errno.EROFS)
446         with llfuse.lock_released:
447             self.collection.remove(name)
448         self.flush()
449
450     @use_counter
451     @check_update
452     def rename(self, name_old, name_new, src):
453         if not self.writable():
454             raise llfuse.FUSEError(errno.EROFS)
455
456         if not isinstance(src, CollectionDirectoryBase):
457             raise llfuse.FUSEError(errno.EPERM)
458
459         if name_new in self:
460             ent = src[name_old]
461             tgt = self[name_new]
462             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
463                 pass
464             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
465                 if len(tgt) > 0:
466                     raise llfuse.FUSEError(errno.ENOTEMPTY)
467             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
468                 raise llfuse.FUSEError(errno.ENOTDIR)
469             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
470                 raise llfuse.FUSEError(errno.EISDIR)
471
472         with llfuse.lock_released:
473             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
474         self.flush()
475         src.flush()
476
477     def clear(self):
478         super(CollectionDirectoryBase, self).clear()
479         if self.collection is not None:
480             self.collection.unsubscribe()
481         self.collection = None
482
483     def objsize(self):
484         # objsize for the whole collection is represented at the root,
485         # don't double-count it
486         return 0
487
488 class CollectionDirectory(CollectionDirectoryBase):
489     """Represents the root of a directory tree representing a collection."""
490
491     __slots__ = ("api", "num_retries", "collection_locator",
492                  "_manifest_size", "_writable", "_updating_lock")
493
494     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
495         super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
496         self.api = api
497         self.num_retries = num_retries
498         self._poll = True
499
500         if isinstance(collection_record, dict):
501             self.collection_locator = collection_record['uuid']
502             self._mtime = convertTime(collection_record.get('modified_at'))
503         else:
504             self.collection_locator = collection_record
505             self._mtime = 0
506
507         is_uuid = (self.collection_locator is not None) and (uuid_pattern.match(self.collection_locator) is not None)
508
509         if is_uuid:
510             # It is a uuid, it may be updated upstream, so recheck it periodically.
511             self._poll_time = 15
512         else:
513             # It is not a uuid.  For immutable collections, collection
514             # only needs to be refreshed if it is very long lived
515             # (long enough that there's a risk of the blob signatures
516             # expiring).
517             try:
518                 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
519             except:
520                 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
521                 self._poll_time = 60*60
522
523         self._writable = is_uuid and enable_write
524         self._manifest_size = 0
525         self._updating_lock = threading.Lock()
526
527     def same(self, i):
528         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
529
530     def writable(self):
531         return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
532
533     @use_counter
534     def flush(self):
535         with llfuse.lock_released:
536             with self._updating_lock:
537                 if self.collection.committed():
538                     self.collection.update()
539                 else:
540                     self.collection.save()
541                 self.new_collection_record(self.collection.api_response())
542
543     def want_event_subscribe(self):
544         return (uuid_pattern.match(self.collection_locator) is not None)
545
546     def new_collection(self, new_collection_record, coll_reader):
547         if self.inode:
548             self.clear()
549         self.collection = coll_reader
550         self.new_collection_record(new_collection_record)
551         self.populate(self.mtime())
552
553     def new_collection_record(self, new_collection_record):
554         if not new_collection_record:
555             raise Exception("invalid new_collection_record")
556         self._mtime = convertTime(new_collection_record.get('modified_at'))
557         self._manifest_size = len(new_collection_record["manifest_text"])
558         self.collection_locator = new_collection_record["uuid"]
559         if self.collection_record_file is not None:
560             self.collection_record_file.invalidate()
561             self.inodes.invalidate_inode(self.collection_record_file)
562             _logger.debug("parent_inode %s invalidated collection record file inode %s", self.inode,
563                           self.collection_record_file.inode)
564         self.inodes.update_uuid(self)
565         self.inodes.inode_cache.update_cache_size(self)
566         self.fresh()
567
568     def uuid(self):
569         return self.collection_locator
570
571     @use_counter
572     def update(self):
573         try:
574             if self.collection_locator is None:
575                 # No collection locator to retrieve from
576                 self.fresh()
577                 return True
578
579             new_collection_record = None
580             try:
581                 with llfuse.lock_released:
582                     self._updating_lock.acquire()
583                     if not self.stale():
584                         return True
585
586                     _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
587                     coll_reader = None
588                     if self.collection is not None:
589                         # Already have a collection object
590                         self.collection.update()
591                         new_collection_record = self.collection.api_response()
592                     else:
593                         # Create a new collection object
594                         if uuid_pattern.match(self.collection_locator):
595                             coll_reader = arvados.collection.Collection(
596                                 self.collection_locator, self.api, self.api.keep,
597                                 num_retries=self.num_retries)
598                         else:
599                             coll_reader = arvados.collection.CollectionReader(
600                                 self.collection_locator, self.api, self.api.keep,
601                                 num_retries=self.num_retries)
602                         new_collection_record = coll_reader.api_response() or {}
603                         # If the Collection only exists in Keep, there will be no API
604                         # response.  Fill in the fields we need.
605                         if 'uuid' not in new_collection_record:
606                             new_collection_record['uuid'] = self.collection_locator
607                         if "portable_data_hash" not in new_collection_record:
608                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
609                         if 'manifest_text' not in new_collection_record:
610                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
611                         if 'storage_classes_desired' not in new_collection_record:
612                             new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
613
614                 # end with llfuse.lock_released, re-acquire lock
615
616                 if new_collection_record is not None:
617                     if coll_reader is not None:
618                         self.new_collection(new_collection_record, coll_reader)
619                     else:
620                         self.new_collection_record(new_collection_record)
621
622                 return True
623             finally:
624                 self._updating_lock.release()
625         except arvados.errors.NotFoundError as e:
626             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
627         except arvados.errors.ArgumentError as detail:
628             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
629             if new_collection_record is not None and "manifest_text" in new_collection_record:
630                 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
631         except Exception:
632             _logger.exception("arv-mount %s: error", self.collection_locator)
633             if new_collection_record is not None and "manifest_text" in new_collection_record:
634                 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
635         self.invalidate()
636         return False
637
638     @use_counter
639     @check_update
640     def collection_record(self):
641         self.flush()
642         return self.collection.api_response()
643
644     @use_counter
645     @check_update
646     def __getitem__(self, item):
647         if item == '.arvados#collection':
648             if self.collection_record_file is None:
649                 self.collection_record_file = FuncToJSONFile(
650                     self.inode, self.collection_record)
651                 self.inodes.add_entry(self.collection_record_file)
652             self.invalidate()  # use lookup as a signal to force update
653             return self.collection_record_file
654         else:
655             return super(CollectionDirectory, self).__getitem__(item)
656
657     def __contains__(self, k):
658         if k == '.arvados#collection':
659             return True
660         else:
661             return super(CollectionDirectory, self).__contains__(k)
662
663     def invalidate(self):
664         if self.collection_record_file is not None:
665             self.collection_record_file.invalidate()
666             self.inodes.invalidate_inode(self.collection_record_file)
667         super(CollectionDirectory, self).invalidate()
668
669     def persisted(self):
670         return (self.collection_locator is not None)
671
672     def objsize(self):
673         # This is a rough guess of the amount of overhead involved for
674         # a collection; the assumptions are that that each file
675         # averages 128 bytes in the manifest, but consume 1024 bytes
676         # of Python data structures, so 1024/128=8 means we estimate
677         # the RAM footprint at 8 times the size of bare manifest text.
678         return self._manifest_size * 8
679
680     def finalize(self):
681         if self.collection is None:
682             return
683
684         if self.writable():
685             try:
686                 self.collection.save()
687             except Exception as e:
688                 _logger.exception("Failed to save collection %s", self.collection_locator)
689         self.collection.stop_threads()
690
691     def clear(self):
692         if self.collection is not None:
693             self.collection.stop_threads()
694         self._manifest_size = 0
695         super(CollectionDirectory, self).clear()
696         if self.collection_record_file is not None:
697             self.inodes.del_entry(self.collection_record_file)
698         self.collection_record_file = None
699
700
701 class TmpCollectionDirectory(CollectionDirectoryBase):
702     """A directory backed by an Arvados collection that never gets saved.
703
704     This supports using Keep as scratch space. A userspace program can
705     read the .arvados#collection file to get a current manifest in
706     order to save a snapshot of the scratch data or use it as a crunch
707     job output.
708     """
709
710     class UnsaveableCollection(arvados.collection.Collection):
711         def save(self):
712             pass
713         def save_new(self):
714             pass
715
716     def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None):
717         collection = self.UnsaveableCollection(
718             api_client=api_client,
719             keep_client=api_client.keep,
720             num_retries=num_retries,
721             storage_classes_desired=storage_classes)
722         # This is always enable_write=True because it never tries to
723         # save to the backend
724         super(TmpCollectionDirectory, self).__init__(
725             parent_inode, inodes, True, filters, collection, self)
726         self.populate(self.mtime())
727
728     def on_event(self, *args, **kwargs):
729         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
730         if self.collection_record_file is None:
731             return
732
733         # See discussion in CollectionDirectoryBase.on_event
734         lockcount = 0
735         try:
736             while True:
737                 self.collection.lock.release()
738                 lockcount += 1
739         except RuntimeError:
740             pass
741
742         try:
743             with llfuse.lock:
744                 with self.collection.lock:
745                     self.collection_record_file.invalidate()
746                     self.inodes.invalidate_inode(self.collection_record_file)
747                     _logger.debug("%s invalidated collection record", self.inode)
748         finally:
749             while lockcount > 0:
750                 self.collection.lock.acquire()
751                 lockcount -= 1
752
753     def collection_record(self):
754         with llfuse.lock_released:
755             return {
756                 "uuid": None,
757                 "manifest_text": self.collection.manifest_text(),
758                 "portable_data_hash": self.collection.portable_data_hash(),
759                 "storage_classes_desired": self.collection.storage_classes_desired(),
760             }
761
762     def __contains__(self, k):
763         return (k == '.arvados#collection' or
764                 super(TmpCollectionDirectory, self).__contains__(k))
765
766     @use_counter
767     def __getitem__(self, item):
768         if item == '.arvados#collection':
769             if self.collection_record_file is None:
770                 self.collection_record_file = FuncToJSONFile(
771                     self.inode, self.collection_record)
772                 self.inodes.add_entry(self.collection_record_file)
773             return self.collection_record_file
774         return super(TmpCollectionDirectory, self).__getitem__(item)
775
776     def persisted(self):
777         return False
778
779     def writable(self):
780         return True
781
782     def flush(self):
783         pass
784
785     def want_event_subscribe(self):
786         return False
787
788     def finalize(self):
789         self.collection.stop_threads()
790
791     def invalidate(self):
792         if self.collection_record_file:
793             self.collection_record_file.invalidate()
794         super(TmpCollectionDirectory, self).invalidate()
795
796
797 class MagicDirectory(Directory):
798     """A special directory that logically contains the set of all extant keep locators.
799
800     When a file is referenced by lookup(), it is tested to see if it is a valid
801     keep locator to a manifest, and if so, loads the manifest contents as a
802     subdirectory of this directory with the locator as the directory name.
803     Since querying a list of all extant keep locators is impractical, only
804     collections that have already been accessed are visible to readdir().
805
806     """
807
808     README_TEXT = """
809 This directory provides access to Arvados collections as subdirectories listed
810 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
811 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
812 (in the form 'zzzzz-j7d0g-1234567890abcde').
813
814 Note that this directory will appear empty until you attempt to access a
815 specific collection or project subdirectory (such as trying to 'cd' into it),
816 at which point the collection or project will actually be looked up on the server
817 and the directory will appear if it exists.
818
819 """.lstrip()
820
821     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
822         super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
823         self.api = api
824         self.num_retries = num_retries
825         self.pdh_only = pdh_only
826         self.storage_classes = storage_classes
827
828     def __setattr__(self, name, value):
829         super(MagicDirectory, self).__setattr__(name, value)
830         # When we're assigned an inode, add a README.
831         if ((name == 'inode') and (self.inode is not None) and
832               (not self._entries)):
833             self._entries['README'] = self.inodes.add_entry(
834                 StringFile(self.inode, self.README_TEXT, time.time()))
835             # If we're the root directory, add an identical by_id subdirectory.
836             if self.inode == llfuse.ROOT_INODE:
837                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
838                     self.inode,
839                     self.inodes,
840                     self.api,
841                     self.num_retries,
842                     self._enable_write,
843                     self._filters,
844                     self.pdh_only,
845                 ))
846
847     def __contains__(self, k):
848         if k in self._entries:
849             return True
850
851         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
852             return False
853
854         try:
855             e = None
856
857             if group_uuid_pattern.match(k):
858                 project = self.api.groups().list(
859                     filters=[
860                         ['group_class', 'in', ['project','filter']],
861                         ["uuid", "=", k],
862                         *self._filters_for('groups', qualified=False),
863                     ],
864                 ).execute(num_retries=self.num_retries)
865                 if project[u'items_available'] == 0:
866                     return False
867                 e = self.inodes.add_entry(ProjectDirectory(
868                     self.inode,
869                     self.inodes,
870                     self.api,
871                     self.num_retries,
872                     self._enable_write,
873                     self._filters,
874                     project[u'items'][0],
875                     storage_classes=self.storage_classes,
876                 ))
877             else:
878                 e = self.inodes.add_entry(CollectionDirectory(
879                     self.inode,
880                     self.inodes,
881                     self.api,
882                     self.num_retries,
883                     self._enable_write,
884                     self._filters,
885                     k,
886                 ))
887
888             if e.update():
889                 if k not in self._entries:
890                     self._entries[k] = e
891                 else:
892                     self.inodes.del_entry(e)
893                 return True
894             else:
895                 self.inodes.invalidate_entry(self, k)
896                 self.inodes.del_entry(e)
897                 return False
898         except Exception as ex:
899             _logger.exception("arv-mount lookup '%s':", k)
900             if e is not None:
901                 self.inodes.del_entry(e)
902             return False
903
904     def __getitem__(self, item):
905         if item in self:
906             return self._entries[item]
907         else:
908             raise KeyError("No collection with id " + item)
909
910     def clear(self):
911         pass
912
913     def want_event_subscribe(self):
914         return not self.pdh_only
915
916
917 class TagsDirectory(Directory):
918     """A special directory that contains as subdirectories all tags visible to the user."""
919
920     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
921         super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
922         self.api = api
923         self.num_retries = num_retries
924         self._poll = True
925         self._poll_time = poll_time
926         self._extra = set()
927
928     def want_event_subscribe(self):
929         return True
930
931     @use_counter
932     def update(self):
933         with llfuse.lock_released:
934             tags = self.api.links().list(
935                 filters=[
936                     ['link_class', '=', 'tag'],
937                     ['name', '!=', ''],
938                     *self._filters_for('links', qualified=False),
939                 ],
940                 select=['name'],
941                 distinct=True,
942                 limit=1000,
943             ).execute(num_retries=self.num_retries)
944         if "items" in tags:
945             self.merge(
946                 tags['items']+[{"name": n} for n in self._extra],
947                 lambda i: i['name'],
948                 lambda a, i: a.tag == i['name'],
949                 lambda i: TagDirectory(
950                     self.inode,
951                     self.inodes,
952                     self.api,
953                     self.num_retries,
954                     self._enable_write,
955                     self._filters,
956                     i['name'],
957                     poll=self._poll,
958                     poll_time=self._poll_time,
959                 ),
960             )
961
962     @use_counter
963     @check_update
964     def __getitem__(self, item):
965         if super(TagsDirectory, self).__contains__(item):
966             return super(TagsDirectory, self).__getitem__(item)
967         with llfuse.lock_released:
968             tags = self.api.links().list(
969                 filters=[
970                     ['link_class', '=', 'tag'],
971                     ['name', '=', item],
972                     *self._filters_for('links', qualified=False),
973                 ],
974                 limit=1,
975             ).execute(num_retries=self.num_retries)
976         if tags["items"]:
977             self._extra.add(item)
978             self.update()
979         return super(TagsDirectory, self).__getitem__(item)
980
981     @use_counter
982     @check_update
983     def __contains__(self, k):
984         if super(TagsDirectory, self).__contains__(k):
985             return True
986         try:
987             self[k]
988             return True
989         except KeyError:
990             pass
991         return False
992
993
994 class TagDirectory(Directory):
995     """A special directory that contains as subdirectories all collections visible
996     to the user that are tagged with a particular tag.
997     """
998
999     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
1000                  poll=False, poll_time=60):
1001         super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1002         self.api = api
1003         self.num_retries = num_retries
1004         self.tag = tag
1005         self._poll = poll
1006         self._poll_time = poll_time
1007
1008     def want_event_subscribe(self):
1009         return True
1010
1011     @use_counter
1012     def update(self):
1013         with llfuse.lock_released:
1014             taggedcollections = self.api.links().list(
1015                 filters=[
1016                     ['link_class', '=', 'tag'],
1017                     ['name', '=', self.tag],
1018                     ['head_uuid', 'is_a', 'arvados#collection'],
1019                     *self._filters_for('links', qualified=False),
1020                 ],
1021                 select=['head_uuid'],
1022             ).execute(num_retries=self.num_retries)
1023         self.merge(
1024             taggedcollections['items'],
1025             lambda i: i['head_uuid'],
1026             lambda a, i: a.collection_locator == i['head_uuid'],
1027             lambda i: CollectionDirectory(
1028                 self.inode,
1029                 self.inodes,
1030                 self.api,
1031                 self.num_retries,
1032                 self._enable_write,
1033                 self._filters,
1034                 i['head_uuid'],
1035             ),
1036         )
1037
1038
1039 class ProjectDirectory(Directory):
1040     """A special directory that contains the contents of a project."""
1041
1042     __slots__ = ("api", "num_retries", "project_object", "project_object_file",
1043                  "project_uuid", "_updating_lock",
1044                  "_current_user", "_full_listing", "storage_classes", "recursively_contained")
1045
1046     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1047                  project_object, poll=True, poll_time=15, storage_classes=None):
1048         super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1049         self.api = api
1050         self.num_retries = num_retries
1051         self.project_object = project_object
1052         self.project_object_file = None
1053         self.project_uuid = project_object['uuid']
1054         self._poll = poll
1055         self._poll_time = poll_time
1056         self._updating_lock = threading.Lock()
1057         self._current_user = None
1058         self._full_listing = False
1059         self.storage_classes = storage_classes
1060         self.recursively_contained = False
1061
1062         # Filter groups can contain themselves, which causes tools
1063         # that walk the filesystem to get stuck in an infinite loop,
1064         # so suppress returning a listing in that case.
1065         if self.project_object.get("group_class") == "filter":
1066             iter_parent_inode = parent_inode
1067             while iter_parent_inode != llfuse.ROOT_INODE:
1068                 parent_dir = self.inodes[iter_parent_inode]
1069                 if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid:
1070                     self.recursively_contained = True
1071                     break
1072                 iter_parent_inode = parent_dir.parent_inode
1073
1074     def want_event_subscribe(self):
1075         return True
1076
1077     def createDirectory(self, i):
1078         common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters)
1079         if collection_uuid_pattern.match(i['uuid']):
1080             return CollectionDirectory(*common_args, i)
1081         elif group_uuid_pattern.match(i['uuid']):
1082             return ProjectDirectory(*common_args, i, self._poll, self._poll_time, self.storage_classes)
1083         elif link_uuid_pattern.match(i['uuid']):
1084             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
1085                 return CollectionDirectory(*common_args, i['head_uuid'])
1086             else:
1087                 return None
1088         elif uuid_pattern.match(i['uuid']):
1089             return ObjectFile(self.parent_inode, i)
1090         else:
1091             return None
1092
1093     def uuid(self):
1094         return self.project_uuid
1095
1096     def items(self):
1097         self._full_listing = True
1098         return super(ProjectDirectory, self).items()
1099
1100     def namefn(self, i):
1101         if 'name' in i:
1102             if i['name'] is None or len(i['name']) == 0:
1103                 return None
1104             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
1105                 # collection or subproject
1106                 return i['name']
1107             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
1108                 # name link
1109                 return i['name']
1110             elif 'kind' in i and i['kind'].startswith('arvados#'):
1111                 # something else
1112                 return "{}.{}".format(i['name'], i['kind'][8:])
1113         else:
1114             return None
1115
1116
1117     @use_counter
1118     def update(self):
1119         if self.project_object_file == None:
1120             self.project_object_file = ObjectFile(self.inode, self.project_object)
1121             self.inodes.add_entry(self.project_object_file)
1122
1123         if self.recursively_contained or not self._full_listing:
1124             return True
1125
1126         def samefn(a, i):
1127             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
1128                 return a.uuid() == i['uuid']
1129             elif isinstance(a, ObjectFile):
1130                 return a.uuid() == i['uuid'] and not a.stale()
1131             return False
1132
1133         try:
1134             with llfuse.lock_released:
1135                 self._updating_lock.acquire()
1136                 if not self.stale():
1137                     return
1138
1139                 if group_uuid_pattern.match(self.project_uuid):
1140                     self.project_object = self.api.groups().get(
1141                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
1142                 elif user_uuid_pattern.match(self.project_uuid):
1143                     self.project_object = self.api.users().get(
1144                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
1145                 # do this in 2 steps until #17424 is fixed
1146                 contents = list(arvados.util.keyset_list_all(
1147                     self.api.groups().contents,
1148                     order_key='uuid',
1149                     num_retries=self.num_retries,
1150                     uuid=self.project_uuid,
1151                     filters=[
1152                         ['uuid', 'is_a', 'arvados#group'],
1153                         ['groups.group_class', 'in', ['project', 'filter']],
1154                         *self._filters_for('groups', qualified=True),
1155                     ],
1156                 ))
1157                 contents.extend(obj for obj in arvados.util.keyset_list_all(
1158                     self.api.groups().contents,
1159                     order_key='uuid',
1160                     num_retries=self.num_retries,
1161                     uuid=self.project_uuid,
1162                     filters=[
1163                         ['uuid', 'is_a', 'arvados#collection'],
1164                         *self._filters_for('collections', qualified=True),
1165                     ],
1166                 ) if obj['current_version_uuid'] == obj['uuid'])
1167             # end with llfuse.lock_released, re-acquire lock
1168
1169             self.merge(contents,
1170                        self.namefn,
1171                        samefn,
1172                        self.createDirectory)
1173             return True
1174         finally:
1175             self._updating_lock.release()
1176
1177     def _add_entry(self, i, name):
1178         ent = self.createDirectory(i)
1179         self._entries[name] = self.inodes.add_entry(ent)
1180         return self._entries[name]
1181
1182     @use_counter
1183     @check_update
1184     def __getitem__(self, k):
1185         if k == '.arvados#project':
1186             return self.project_object_file
1187         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1188             return super(ProjectDirectory, self).__getitem__(k)
1189         with llfuse.lock_released:
1190             k2 = self.unsanitize_filename(k)
1191             if k2 == k:
1192                 namefilter = ["name", "=", k]
1193             else:
1194                 namefilter = ["name", "in", [k, k2]]
1195             contents = self.api.groups().list(
1196                 filters=[
1197                     ["owner_uuid", "=", self.project_uuid],
1198                     ["group_class", "in", ["project","filter"]],
1199                     namefilter,
1200                     *self._filters_for('groups', qualified=False),
1201                 ],
1202                 limit=2,
1203             ).execute(num_retries=self.num_retries)["items"]
1204             if not contents:
1205                 contents = self.api.collections().list(
1206                     filters=[
1207                         ["owner_uuid", "=", self.project_uuid],
1208                         namefilter,
1209                         *self._filters_for('collections', qualified=False),
1210                     ],
1211                     limit=2,
1212                 ).execute(num_retries=self.num_retries)["items"]
1213         if contents:
1214             if len(contents) > 1 and contents[1]['name'] == k:
1215                 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1216                 # "foo[SUBST]bar".
1217                 contents = [contents[1]]
1218             name = self.sanitize_filename(self.namefn(contents[0]))
1219             if name != k:
1220                 raise KeyError(k)
1221             return self._add_entry(contents[0], name)
1222
1223         # Didn't find item
1224         raise KeyError(k)
1225
1226     def __contains__(self, k):
1227         if k == '.arvados#project':
1228             return True
1229         try:
1230             self[k]
1231             return True
1232         except KeyError:
1233             pass
1234         return False
1235
1236     @use_counter
1237     @check_update
1238     def writable(self):
1239         if not self._enable_write:
1240             return False
1241         return self.project_object.get("can_write") is True
1242
1243     def persisted(self):
1244         return True
1245
1246     def clear(self):
1247         super(ProjectDirectory, self).clear()
1248         if self.project_object_file is not None:
1249             self.inodes.del_entry(self.project_object_file)
1250         self.project_object_file = None
1251
1252     @use_counter
1253     @check_update
1254     def mkdir(self, name):
1255         if not self.writable():
1256             raise llfuse.FUSEError(errno.EROFS)
1257
1258         try:
1259             with llfuse.lock_released:
1260                 c = {
1261                     "owner_uuid": self.project_uuid,
1262                     "name": name,
1263                     "manifest_text": "" }
1264                 if self.storage_classes is not None:
1265                     c["storage_classes_desired"] = self.storage_classes
1266                 try:
1267                     self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1268                 except Exception as e:
1269                     raise
1270             self.invalidate()
1271         except apiclient_errors.Error as error:
1272             _logger.error(error)
1273             raise llfuse.FUSEError(errno.EEXIST)
1274
1275     @use_counter
1276     @check_update
1277     def rmdir(self, name):
1278         if not self.writable():
1279             raise llfuse.FUSEError(errno.EROFS)
1280
1281         if name not in self:
1282             raise llfuse.FUSEError(errno.ENOENT)
1283         if not isinstance(self[name], CollectionDirectory):
1284             raise llfuse.FUSEError(errno.EPERM)
1285         if len(self[name]) > 0:
1286             raise llfuse.FUSEError(errno.ENOTEMPTY)
1287         with llfuse.lock_released:
1288             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1289         self.invalidate()
1290
1291     @use_counter
1292     @check_update
1293     def rename(self, name_old, name_new, src):
1294         if not self.writable():
1295             raise llfuse.FUSEError(errno.EROFS)
1296
1297         if not isinstance(src, ProjectDirectory):
1298             raise llfuse.FUSEError(errno.EPERM)
1299
1300         ent = src[name_old]
1301
1302         if not isinstance(ent, CollectionDirectory):
1303             raise llfuse.FUSEError(errno.EPERM)
1304
1305         if name_new in self:
1306             # POSIX semantics for replacing one directory with another is
1307             # tricky (the target directory must be empty, the operation must be
1308             # atomic which isn't possible with the Arvados API as of this
1309             # writing) so don't support that.
1310             raise llfuse.FUSEError(errno.EPERM)
1311
1312         self.api.collections().update(uuid=ent.uuid(),
1313                                       body={"owner_uuid": self.uuid(),
1314                                             "name": name_new}).execute(num_retries=self.num_retries)
1315
1316         # Acually move the entry from source directory to this directory.
1317         del src._entries[name_old]
1318         self._entries[name_new] = ent
1319         self.inodes.invalidate_entry(src, name_old)
1320
1321     @use_counter
1322     def child_event(self, ev):
1323         properties = ev.get("properties") or {}
1324         old_attrs = properties.get("old_attributes") or {}
1325         new_attrs = properties.get("new_attributes") or {}
1326         old_attrs["uuid"] = ev["object_uuid"]
1327         new_attrs["uuid"] = ev["object_uuid"]
1328         old_name = self.sanitize_filename(self.namefn(old_attrs))
1329         new_name = self.sanitize_filename(self.namefn(new_attrs))
1330
1331         # create events will have a new name, but not an old name
1332         # delete events will have an old name, but not a new name
1333         # update events will have an old and new name, and they may be same or different
1334         # if they are the same, an unrelated field changed and there is nothing to do.
1335
1336         if old_attrs.get("owner_uuid") != self.project_uuid:
1337             # Was moved from somewhere else, so don't try to remove entry.
1338             old_name = None
1339         if ev.get("object_owner_uuid") != self.project_uuid:
1340             # Was moved to somewhere else, so don't try to add entry
1341             new_name = None
1342
1343         if old_attrs.get("is_trashed"):
1344             # Was previously deleted
1345             old_name = None
1346         if new_attrs.get("is_trashed"):
1347             # Has been deleted
1348             new_name = None
1349
1350         if new_name != old_name:
1351             ent = None
1352             if old_name in self._entries:
1353                 ent = self._entries[old_name]
1354                 del self._entries[old_name]
1355                 self.inodes.invalidate_entry(self, old_name)
1356
1357             if new_name:
1358                 if ent is not None:
1359                     self._entries[new_name] = ent
1360                 else:
1361                     self._add_entry(new_attrs, new_name)
1362             elif ent is not None:
1363                 self.inodes.del_entry(ent)
1364
1365
1366 class SharedDirectory(Directory):
1367     """A special directory that represents users or groups who have shared projects with me."""
1368
1369     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1370                  exclude, poll=False, poll_time=60, storage_classes=None):
1371         super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1372         self.api = api
1373         self.num_retries = num_retries
1374         self.current_user = api.users().current().execute(num_retries=num_retries)
1375         self._poll = True
1376         self._poll_time = poll_time
1377         self._updating_lock = threading.Lock()
1378         self.storage_classes = storage_classes
1379
1380     @use_counter
1381     def update(self):
1382         try:
1383             with llfuse.lock_released:
1384                 self._updating_lock.acquire()
1385                 if not self.stale():
1386                     return
1387
1388                 contents = {}
1389                 roots = []
1390                 root_owners = set()
1391                 objects = {}
1392
1393                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1394                 if 'httpMethod' in methods.get('shared', {}):
1395                     page = []
1396                     while True:
1397                         resp = self.api.groups().shared(
1398                             filters=[
1399                                 ['group_class', 'in', ['project','filter']],
1400                                 *page,
1401                                 *self._filters_for('groups', qualified=False),
1402                             ],
1403                             order="uuid",
1404                             limit=10000,
1405                             count="none",
1406                             include="owner_uuid",
1407                         ).execute()
1408                         if not resp["items"]:
1409                             break
1410                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1411                         for r in resp["items"]:
1412                             objects[r["uuid"]] = r
1413                             roots.append(r["uuid"])
1414                         for r in resp["included"]:
1415                             objects[r["uuid"]] = r
1416                             root_owners.add(r["uuid"])
1417                 else:
1418                     all_projects = list(arvados.util.keyset_list_all(
1419                         self.api.groups().list,
1420                         order_key="uuid",
1421                         num_retries=self.num_retries,
1422                         filters=[
1423                             ['group_class', 'in', ['project','filter']],
1424                             *self._filters_for('groups', qualified=False),
1425                         ],
1426                         select=["uuid", "owner_uuid"],
1427                     ))
1428                     for ob in all_projects:
1429                         objects[ob['uuid']] = ob
1430
1431                     current_uuid = self.current_user['uuid']
1432                     for ob in all_projects:
1433                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1434                             roots.append(ob['uuid'])
1435                             root_owners.add(ob['owner_uuid'])
1436
1437                     lusers = arvados.util.keyset_list_all(
1438                         self.api.users().list,
1439                         order_key="uuid",
1440                         num_retries=self.num_retries,
1441                         filters=[
1442                             ['uuid', 'in', list(root_owners)],
1443                             *self._filters_for('users', qualified=False),
1444                         ],
1445                     )
1446                     lgroups = arvados.util.keyset_list_all(
1447                         self.api.groups().list,
1448                         order_key="uuid",
1449                         num_retries=self.num_retries,
1450                         filters=[
1451                             ['uuid', 'in', list(root_owners)+roots],
1452                             *self._filters_for('groups', qualified=False),
1453                         ],
1454                     )
1455                     for l in lusers:
1456                         objects[l["uuid"]] = l
1457                     for l in lgroups:
1458                         objects[l["uuid"]] = l
1459
1460                 for r in root_owners:
1461                     if r in objects:
1462                         obr = objects[r]
1463                         if obr.get("name"):
1464                             contents[obr["name"]] = obr
1465                         elif "first_name" in obr:
1466                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1467
1468                 for r in roots:
1469                     if r in objects:
1470                         obr = objects[r]
1471                         if obr['owner_uuid'] not in objects:
1472                             contents[obr["name"]] = obr
1473
1474             # end with llfuse.lock_released, re-acquire lock
1475
1476             self.merge(
1477                 contents.items(),
1478                 lambda i: i[0],
1479                 lambda a, i: a.uuid() == i[1]['uuid'],
1480                 lambda i: ProjectDirectory(
1481                     self.inode,
1482                     self.inodes,
1483                     self.api,
1484                     self.num_retries,
1485                     self._enable_write,
1486                     self._filters,
1487                     i[1],
1488                     poll=self._poll,
1489                     poll_time=self._poll_time,
1490                     storage_classes=self.storage_classes,
1491                 ),
1492             )
1493         except Exception:
1494             _logger.exception("arv-mount shared dir error")
1495         finally:
1496             self._updating_lock.release()
1497
1498     def want_event_subscribe(self):
1499         return True