]> git.arvados.org - arvados.git/blob - services/fuse/arvados_fuse/fusedir.py
22420: Refactor Collection automatic update and merge
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import apiclient
6 import arvados
7 import errno
8 import functools
9 import llfuse
10 import logging
11 import re
12 import sys
13 import threading
14 import time
15 from apiclient import errors as apiclient_errors
16
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
19
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
22
23 _logger = logging.getLogger('arvados.arvados_fuse')
24
25
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile(r'[\x00/]')
30
31
32 class Directory(FreshBase):
33     """Generic directory object, backed by a dict.
34
35     Consists of a set of entries with the key representing the filename
36     and the value referencing a File or Directory object.
37     """
38
39     __slots__ = ("inode", "parent_inode", "inodes", "_entries", "_mtime", "_enable_write", "_filters")
40
41     def __init__(self, parent_inode, inodes, enable_write, filters):
42         """parent_inode is the integer inode number"""
43
44         super(Directory, self).__init__()
45
46         self.inode = None
47         if not isinstance(parent_inode, int):
48             raise Exception("parent_inode should be an int")
49         self.parent_inode = parent_inode
50         self.inodes = inodes
51         self._entries = {}
52         self._mtime = time.time()
53         self._enable_write = enable_write
54         self._filters = filters or []
55
56     def _filters_for(self, subtype, *, qualified):
57         for f in self._filters:
58             f_type, _, f_name = f[0].partition('.')
59             if not f_name:
60                 yield f
61             elif f_type != subtype:
62                 pass
63             elif qualified:
64                 yield f
65             else:
66                 yield [f_name, *f[1:]]
67
68     def unsanitize_filename(self, incoming):
69         """Replace ForwardSlashNameSubstitution value with /"""
70         fsns = self.inodes.forward_slash_subst()
71         if isinstance(fsns, str):
72             return incoming.replace(fsns, '/')
73         else:
74             return incoming
75
76     def sanitize_filename(self, dirty):
77         """Replace disallowed filename characters according to
78         ForwardSlashNameSubstitution in self.api_config."""
79         # '.' and '..' are not reachable if API server is newer than #6277
80         if dirty is None:
81             return None
82         elif dirty == '':
83             return '_'
84         elif dirty == '.':
85             return '_'
86         elif dirty == '..':
87             return '__'
88         else:
89             fsns = self.inodes.forward_slash_subst()
90             if isinstance(fsns, str):
91                 dirty = dirty.replace('/', fsns)
92             return _disallowed_filename_characters.sub('_', dirty)
93
94
95     #  Overridden by subclasses to implement logic to update the
96     #  entries dict when the directory is stale
97     @use_counter
98     def update(self):
99         pass
100
101     # Only used when computing the size of the disk footprint of the directory
102     # (stub)
103     def size(self):
104         return 0
105
106     def persisted(self):
107         return False
108
109     def checkupdate(self):
110         if self.stale():
111             try:
112                 self.update()
113             except apiclient.errors.HttpError as e:
114                 _logger.warn(e)
115
116     @use_counter
117     @check_update
118     def __getitem__(self, item):
119         return self._entries[item]
120
121     @use_counter
122     @check_update
123     def items(self):
124         return list(self._entries.items())
125
126     @use_counter
127     @check_update
128     def __contains__(self, k):
129         return k in self._entries
130
131     @use_counter
132     @check_update
133     def __len__(self):
134         return len(self._entries)
135
136     def fresh(self):
137         self.inodes.touch(self)
138         super(Directory, self).fresh()
139
140     def objsize(self):
141         # Rough estimate of memory footprint based on using pympler
142         return len(self._entries) * 1024
143
144     def merge(self, items, fn, same, new_entry):
145         """Helper method for updating the contents of the directory.
146
147         Takes a list describing the new contents of the directory, reuse
148         entries that are the same in both the old and new lists, create new
149         entries, and delete old entries missing from the new list.
150
151         Arguments:
152         * items: Iterable --- New directory contents
153
154         * fn: Callable --- Takes an entry in 'items' and return the desired file or
155         directory name, or None if this entry should be skipped
156
157         * same: Callable --- Compare an existing entry (a File or Directory
158         object) with an entry in the items list to determine whether to keep
159         the existing entry.
160
161         * new_entry: Callable --- Create a new directory entry (File or Directory
162         object) from an entry in the items list.
163
164         """
165
166         oldentries = self._entries
167         self._entries = {}
168         changed = False
169         for i in items:
170             name = self.sanitize_filename(fn(i))
171             if not name:
172                 continue
173             if name in oldentries:
174                 ent = oldentries[name]
175                 if same(ent, i) and ent.parent_inode == self.inode:
176                     # move existing directory entry over
177                     self._entries[name] = ent
178                     del oldentries[name]
179                     self.inodes.inode_cache.touch(ent)
180
181         for i in items:
182             name = self.sanitize_filename(fn(i))
183             if not name:
184                 continue
185             if name not in self._entries:
186                 # create new directory entry
187                 ent = new_entry(i)
188                 if ent is not None:
189                     self._entries[name] = self.inodes.add_entry(ent)
190                     # need to invalidate this just in case there was a
191                     # previous entry that couldn't be moved over or a
192                     # lookup that returned file not found and cached
193                     # a negative result
194                     self.inodes.invalidate_entry(self, name)
195                     changed = True
196                 _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
197
198         # delete any other directory entries that were not in found in 'items'
199         for name, ent in oldentries.items():
200             _logger.debug("Detaching entry '%s' from parent_inode %i", name, self.inode)
201             self.inodes.invalidate_entry(self, name)
202             self.inodes.del_entry(ent)
203             changed = True
204
205         if changed:
206             self._mtime = time.time()
207             self.inodes.inode_cache.update_cache_size(self)
208
209         self.fresh()
210
211     def in_use(self):
212         if super(Directory, self).in_use():
213             return True
214         for v in self._entries.values():
215             if v.in_use():
216                 return True
217         return False
218
219     def clear(self):
220         """Delete all entries"""
221         if not self._entries:
222             return
223         oldentries = self._entries
224         self._entries = {}
225         self.invalidate()
226         for name, ent in oldentries.items():
227             ent.clear()
228             self.inodes.invalidate_entry(self, name)
229             self.inodes.del_entry(ent)
230         self.inodes.inode_cache.update_cache_size(self)
231
232     def kernel_invalidate(self):
233         # Invalidating the dentry on the parent implies invalidating all paths
234         # below it as well.
235         if self.parent_inode in self.inodes:
236             parent = self.inodes[self.parent_inode]
237         else:
238             # parent was removed already.
239             return
240
241         # Find self on the parent in order to invalidate this path.
242         # Calling the public items() method might trigger a refresh,
243         # which we definitely don't want, so read the internal dict directly.
244         for k,v in parent._entries.items():
245             if v is self:
246                 self.inodes.invalidate_entry(parent, k)
247                 break
248
249     def mtime(self):
250         return self._mtime
251
252     def writable(self):
253         return False
254
255     def flush(self):
256         pass
257
258     def want_event_subscribe(self):
259         raise NotImplementedError()
260
261     def create(self, name):
262         raise NotImplementedError()
263
264     def mkdir(self, name):
265         raise NotImplementedError()
266
267     def unlink(self, name):
268         raise NotImplementedError()
269
270     def rmdir(self, name):
271         raise NotImplementedError()
272
273     def rename(self, name_old, name_new, src):
274         raise NotImplementedError()
275
276
277 class CollectionDirectoryBase(Directory):
278     """Represent an Arvados Collection as a directory.
279
280     This class is used for Subcollections, and is also the base class for
281     CollectionDirectory, which implements collection loading/saving on
282     Collection records.
283
284     Most operations act only the underlying Arvados `Collection` object.  The
285     `Collection` object signals via a notify callback to
286     `CollectionDirectoryBase.on_event` that an item was added, removed or
287     modified.  FUSE inodes and directory entries are created, deleted or
288     invalidated in response to these events.
289
290     """
291
292     __slots__ = ("collection", "collection_root", "collection_record_file")
293
294     def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root):
295         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
296         self.collection = collection
297         self.collection_root = collection_root
298         self.collection_record_file = None
299
300     def new_entry(self, name, item, mtime):
301         name = self.sanitize_filename(name)
302         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
303             if item.fuse_entry.parent_inode is not None:
304                 raise Exception("Can only reparent unparented inode entry")
305             if item.fuse_entry.inode is None:
306                 raise Exception("Reparented entry must still have valid inode")
307             item.fuse_entry.parent_inode = self.inode
308             self._entries[name] = item.fuse_entry
309         elif isinstance(item, arvados.collection.RichCollectionBase):
310             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
311                 self.inode,
312                 self.inodes,
313                 self._enable_write,
314                 self._filters,
315                 item,
316                 self.collection_root,
317             ))
318             self._entries[name].populate(mtime)
319         else:
320             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
321         item.fuse_entry = self._entries[name]
322
323     def on_event(self, event, collection, name, item):
324         # These are events from the Collection object (ADD/DEL/MOD)
325         # emitted by operations on the Collection object (like
326         # "mkdirs" or "remove"), and by "update", which we need to
327         # synchronize with our FUSE objects that are assigned inodes.
328         if collection == self.collection:
329             name = self.sanitize_filename(name)
330
331             #
332             # It's possible for another thread to have llfuse.lock and
333             # be waiting on collection.lock.  Meanwhile, we released
334             # llfuse.lock earlier in the stack, but are still holding
335             # on to the collection lock, and now we need to re-acquire
336             # llfuse.lock.  If we don't release the collection lock,
337             # we'll deadlock where we're holding the collection lock
338             # waiting for llfuse.lock and the other thread is holding
339             # llfuse.lock and waiting for the collection lock.
340             #
341             # The correct locking order here is to take llfuse.lock
342             # first, then the collection lock.
343             #
344             # Since collection.lock is an RLock, it might be locked
345             # multiple times, so we need to release it multiple times,
346             # keep a count, then re-lock it the correct number of
347             # times.
348             #
349             lockcount = 0
350             try:
351                 while True:
352                     self.collection.lock.release()
353                     lockcount += 1
354             except RuntimeError:
355                 pass
356
357             try:
358                 with llfuse.lock:
359                     with self.collection.lock:
360                         if event == arvados.collection.ADD:
361                             self.new_entry(name, item, self.mtime())
362                         elif event == arvados.collection.DEL:
363                             ent = self._entries[name]
364                             del self._entries[name]
365                             self.inodes.invalidate_entry(self, name)
366                             self.inodes.del_entry(ent)
367                         elif event == arvados.collection.MOD:
368                             # MOD events have (modified_from, newitem)
369                             newitem = item[1]
370                             if hasattr(newitem, "fuse_entry") and newitem.fuse_entry is not None:
371                                 newitem.fuse_entry.invalidate()
372                                 self.inodes.invalidate_inode(newitem.fuse_entry)
373                             elif name in self._entries:
374                                 self._entries[name].invalidate()
375                                 self.inodes.invalidate_inode(self._entries[name])
376                         # we don't care about TOK events, those mean
377                         # only token signatures were updated
378
379                         if self.collection_record_file is not None:
380                             self.collection_record_file.invalidate()
381                             self.inodes.invalidate_inode(self.collection_record_file)
382             finally:
383                 while lockcount > 0:
384                     self.collection.lock.acquire()
385                     lockcount -= 1
386
387     def populate(self, mtime):
388         self._mtime = mtime
389         with self.collection.lock:
390             self.collection.subscribe(self.on_event)
391             for entry, item in self.collection.items():
392                 self.new_entry(entry, item, self.mtime())
393
394     def writable(self):
395         return self._enable_write and self.collection.writable()
396
397     @use_counter
398     def flush(self):
399         self.collection_root.flush()
400
401     @use_counter
402     @check_update
403     def create(self, name):
404         if not self.writable():
405             raise llfuse.FUSEError(errno.EROFS)
406         with llfuse.lock_released:
407             self.collection.open(name, "w").close()
408
409     @use_counter
410     @check_update
411     def mkdir(self, name):
412         if not self.writable():
413             raise llfuse.FUSEError(errno.EROFS)
414         with llfuse.lock_released:
415             self.collection.mkdirs(name)
416
417     @use_counter
418     @check_update
419     def unlink(self, name):
420         if not self.writable():
421             raise llfuse.FUSEError(errno.EROFS)
422         with llfuse.lock_released:
423             self.collection.remove(name)
424         self.flush()
425
426     @use_counter
427     @check_update
428     def rmdir(self, name):
429         if not self.writable():
430             raise llfuse.FUSEError(errno.EROFS)
431         with llfuse.lock_released:
432             self.collection.remove(name)
433         self.flush()
434
435     @use_counter
436     @check_update
437     def rename(self, name_old, name_new, src):
438         if not self.writable():
439             raise llfuse.FUSEError(errno.EROFS)
440
441         if not isinstance(src, CollectionDirectoryBase):
442             raise llfuse.FUSEError(errno.EPERM)
443
444         if name_new in self:
445             ent = src[name_old]
446             tgt = self[name_new]
447             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
448                 pass
449             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
450                 if len(tgt) > 0:
451                     raise llfuse.FUSEError(errno.ENOTEMPTY)
452             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
453                 raise llfuse.FUSEError(errno.ENOTDIR)
454             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
455                 raise llfuse.FUSEError(errno.EISDIR)
456
457         with llfuse.lock_released:
458             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
459         self.flush()
460         src.flush()
461
462     def clear(self):
463         super(CollectionDirectoryBase, self).clear()
464         if self.collection is not None:
465             self.collection.unsubscribe()
466         self.collection = None
467
468     def objsize(self):
469         # objsize for the whole collection is represented at the root,
470         # don't double-count it
471         return 0
472
473 class CollectionDirectory(CollectionDirectoryBase):
474     """Represents the root of a directory tree representing a collection."""
475
476     __slots__ = ("api", "num_retries", "collection_locator",
477                  "_manifest_size", "_writable", "_updating_lock")
478
479     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
480         super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
481         self.api = api
482         self.num_retries = num_retries
483         self._poll = True
484
485         if isinstance(collection_record, dict):
486             self.collection_locator = collection_record['uuid']
487             self._mtime = convertTime(collection_record.get('modified_at'))
488         else:
489             self.collection_locator = collection_record
490             self._mtime = 0
491
492         is_uuid = (self.collection_locator is not None) and (uuid_pattern.match(self.collection_locator) is not None)
493
494         if is_uuid:
495             # It is a uuid, it may be updated upstream, so recheck it periodically.
496             self._poll_time = 15
497         else:
498             # It is not a uuid.  For immutable collections, collection
499             # only needs to be refreshed if it is very long lived
500             # (long enough that there's a risk of the blob signatures
501             # expiring).
502             try:
503                 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
504             except:
505                 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
506                 self._poll_time = 60*60
507
508         self._writable = is_uuid and enable_write
509         self._manifest_size = 0
510         self._updating_lock = threading.Lock()
511
512     def same(self, i):
513         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
514
515     def writable(self):
516         return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
517
518     @use_counter
519     def flush(self):
520         if not self.writable():
521             return
522         with llfuse.lock_released:
523             with self._updating_lock:
524                 if self.collection.committed():
525                     self.collection.update()
526                 else:
527                     self.collection.save()
528                 self.new_collection_record(self.collection.api_response())
529
530     def want_event_subscribe(self):
531         return (uuid_pattern.match(self.collection_locator) is not None)
532
533     def new_collection(self, new_collection_record, coll_reader):
534         if self.inode:
535             self.clear()
536         self.collection = coll_reader
537         self.new_collection_record(new_collection_record)
538         self.populate(self.mtime())
539
540     def new_collection_record(self, new_collection_record):
541         if not new_collection_record:
542             raise Exception("invalid new_collection_record")
543         self._mtime = convertTime(new_collection_record.get('modified_at'))
544         self._manifest_size = len(new_collection_record["manifest_text"])
545         self.collection_locator = new_collection_record["uuid"]
546         if self.collection_record_file is not None:
547             self.collection_record_file.invalidate()
548             self.inodes.invalidate_inode(self.collection_record_file)
549             _logger.debug("parent_inode %s invalidated collection record file inode %s", self.inode,
550                           self.collection_record_file.inode)
551         self.inodes.update_uuid(self)
552         self.inodes.inode_cache.update_cache_size(self)
553         self.fresh()
554
555     def uuid(self):
556         return self.collection_locator
557
558     @use_counter
559     def update(self):
560         try:
561             if self.collection_locator is None:
562                 # No collection locator to retrieve from
563                 self.fresh()
564                 return True
565
566             new_collection_record = None
567             try:
568                 with llfuse.lock_released:
569                     self._updating_lock.acquire()
570                     if not self.stale():
571                         return True
572
573                     _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
574                     coll_reader = None
575                     if self.collection is not None:
576                         # Already have a collection object
577                         self.collection.update()
578                         new_collection_record = self.collection.api_response()
579                     else:
580                         # Create a new collection object
581                         if uuid_pattern.match(self.collection_locator):
582                             coll_reader = arvados.collection.Collection(
583                                 self.collection_locator, self.api, self.api.keep,
584                                 num_retries=self.num_retries)
585                         else:
586                             coll_reader = arvados.collection.CollectionReader(
587                                 self.collection_locator, self.api, self.api.keep,
588                                 num_retries=self.num_retries)
589                         new_collection_record = coll_reader.api_response() or {}
590                         # If the Collection only exists in Keep, there will be no API
591                         # response.  Fill in the fields we need.
592                         if 'uuid' not in new_collection_record:
593                             new_collection_record['uuid'] = self.collection_locator
594                         if "portable_data_hash" not in new_collection_record:
595                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
596                         if 'manifest_text' not in new_collection_record:
597                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
598                         if 'storage_classes_desired' not in new_collection_record:
599                             new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
600
601                 # end with llfuse.lock_released, re-acquire lock
602
603                 if new_collection_record is not None:
604                     if coll_reader is not None:
605                         self.new_collection(new_collection_record, coll_reader)
606                     else:
607                         self.new_collection_record(new_collection_record)
608
609                 return True
610             finally:
611                 self._updating_lock.release()
612         except arvados.errors.NotFoundError as e:
613             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
614         except arvados.errors.ArgumentError as detail:
615             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
616             if new_collection_record is not None and "manifest_text" in new_collection_record:
617                 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
618         except Exception:
619             _logger.exception("arv-mount %s: error", self.collection_locator)
620             if new_collection_record is not None and "manifest_text" in new_collection_record:
621                 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
622         self.invalidate()
623         return False
624
625     @use_counter
626     @check_update
627     def collection_record(self):
628         self.flush()
629         return self.collection.api_response()
630
631     @use_counter
632     @check_update
633     def __getitem__(self, item):
634         if item == '.arvados#collection':
635             if self.collection_record_file is None:
636                 self.collection_record_file = FuncToJSONFile(
637                     self.inode, self.collection_record)
638                 self.inodes.add_entry(self.collection_record_file)
639             self.invalidate()  # use lookup as a signal to force update
640             return self.collection_record_file
641         else:
642             return super(CollectionDirectory, self).__getitem__(item)
643
644     def __contains__(self, k):
645         if k == '.arvados#collection':
646             return True
647         else:
648             return super(CollectionDirectory, self).__contains__(k)
649
650     def invalidate(self):
651         if self.collection_record_file is not None:
652             self.collection_record_file.invalidate()
653             self.inodes.invalidate_inode(self.collection_record_file)
654         super(CollectionDirectory, self).invalidate()
655
656     def persisted(self):
657         return (self.collection_locator is not None)
658
659     def objsize(self):
660         # This is a rough guess of the amount of overhead involved for
661         # a collection; the assumptions are that that each file
662         # averages 128 bytes in the manifest, but consume 1024 bytes
663         # of Python data structures, so 1024/128=8 means we estimate
664         # the RAM footprint at 8 times the size of bare manifest text.
665         return self._manifest_size * 8
666
667     def finalize(self):
668         if self.collection is None:
669             return
670
671         if self.writable():
672             try:
673                 self.collection.save()
674             except Exception as e:
675                 _logger.exception("Failed to save collection %s", self.collection_locator)
676         self.collection.stop_threads()
677
678     def clear(self):
679         if self.collection is not None:
680             self.collection.stop_threads()
681         self._manifest_size = 0
682         super(CollectionDirectory, self).clear()
683         if self.collection_record_file is not None:
684             self.inodes.del_entry(self.collection_record_file)
685         self.collection_record_file = None
686
687
688 class TmpCollectionDirectory(CollectionDirectoryBase):
689     """A directory backed by an Arvados collection that never gets saved.
690
691     This supports using Keep as scratch space. A userspace program can
692     read the .arvados#collection file to get a current manifest in
693     order to save a snapshot of the scratch data or use it as a crunch
694     job output.
695     """
696
697     class UnsaveableCollection(arvados.collection.Collection):
698         def save(self):
699             pass
700         def save_new(self):
701             pass
702
703     def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None):
704         collection = self.UnsaveableCollection(
705             api_client=api_client,
706             keep_client=api_client.keep,
707             num_retries=num_retries,
708             storage_classes_desired=storage_classes)
709         # This is always enable_write=True because it never tries to
710         # save to the backend
711         super(TmpCollectionDirectory, self).__init__(
712             parent_inode, inodes, True, filters, collection, self)
713         self.populate(self.mtime())
714
715     def on_event(self, *args, **kwargs):
716         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
717         if self.collection_record_file is None:
718             return
719
720         # See discussion in CollectionDirectoryBase.on_event
721         lockcount = 0
722         try:
723             while True:
724                 self.collection.lock.release()
725                 lockcount += 1
726         except RuntimeError:
727             pass
728
729         try:
730             with llfuse.lock:
731                 with self.collection.lock:
732                     self.collection_record_file.invalidate()
733                     self.inodes.invalidate_inode(self.collection_record_file)
734                     _logger.debug("%s invalidated collection record", self.inode)
735         finally:
736             while lockcount > 0:
737                 self.collection.lock.acquire()
738                 lockcount -= 1
739
740     def collection_record(self):
741         with llfuse.lock_released:
742             return {
743                 "uuid": None,
744                 "manifest_text": self.collection.manifest_text(),
745                 "portable_data_hash": self.collection.portable_data_hash(),
746                 "storage_classes_desired": self.collection.storage_classes_desired(),
747             }
748
749     def __contains__(self, k):
750         return (k == '.arvados#collection' or
751                 super(TmpCollectionDirectory, self).__contains__(k))
752
753     @use_counter
754     def __getitem__(self, item):
755         if item == '.arvados#collection':
756             if self.collection_record_file is None:
757                 self.collection_record_file = FuncToJSONFile(
758                     self.inode, self.collection_record)
759                 self.inodes.add_entry(self.collection_record_file)
760             return self.collection_record_file
761         return super(TmpCollectionDirectory, self).__getitem__(item)
762
763     def persisted(self):
764         return False
765
766     def writable(self):
767         return True
768
769     def flush(self):
770         pass
771
772     def want_event_subscribe(self):
773         return False
774
775     def finalize(self):
776         self.collection.stop_threads()
777
778     def invalidate(self):
779         if self.collection_record_file:
780             self.collection_record_file.invalidate()
781         super(TmpCollectionDirectory, self).invalidate()
782
783
784 class MagicDirectory(Directory):
785     """A special directory that logically contains the set of all extant keep locators.
786
787     When a file is referenced by lookup(), it is tested to see if it is a valid
788     keep locator to a manifest, and if so, loads the manifest contents as a
789     subdirectory of this directory with the locator as the directory name.
790     Since querying a list of all extant keep locators is impractical, only
791     collections that have already been accessed are visible to readdir().
792
793     """
794
795     README_TEXT = """
796 This directory provides access to Arvados collections as subdirectories listed
797 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
798 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
799 (in the form 'zzzzz-j7d0g-1234567890abcde').
800
801 Note that this directory will appear empty until you attempt to access a
802 specific collection or project subdirectory (such as trying to 'cd' into it),
803 at which point the collection or project will actually be looked up on the server
804 and the directory will appear if it exists.
805
806 """.lstrip()
807
808     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
809         super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
810         self.api = api
811         self.num_retries = num_retries
812         self.pdh_only = pdh_only
813         self.storage_classes = storage_classes
814
815     def __setattr__(self, name, value):
816         super(MagicDirectory, self).__setattr__(name, value)
817         # When we're assigned an inode, add a README.
818         if ((name == 'inode') and (self.inode is not None) and
819               (not self._entries)):
820             self._entries['README'] = self.inodes.add_entry(
821                 StringFile(self.inode, self.README_TEXT, time.time()))
822             # If we're the root directory, add an identical by_id subdirectory.
823             if self.inode == llfuse.ROOT_INODE:
824                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
825                     self.inode,
826                     self.inodes,
827                     self.api,
828                     self.num_retries,
829                     self._enable_write,
830                     self._filters,
831                     self.pdh_only,
832                 ))
833
834     def __contains__(self, k):
835         if k in self._entries:
836             return True
837
838         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
839             return False
840
841         try:
842             e = None
843
844             if group_uuid_pattern.match(k):
845                 project = self.api.groups().list(
846                     filters=[
847                         ['group_class', 'in', ['project','filter']],
848                         ["uuid", "=", k],
849                         *self._filters_for('groups', qualified=False),
850                     ],
851                 ).execute(num_retries=self.num_retries)
852                 if project[u'items_available'] == 0:
853                     return False
854                 e = self.inodes.add_entry(ProjectDirectory(
855                     self.inode,
856                     self.inodes,
857                     self.api,
858                     self.num_retries,
859                     self._enable_write,
860                     self._filters,
861                     project[u'items'][0],
862                     storage_classes=self.storage_classes,
863                 ))
864             else:
865                 e = self.inodes.add_entry(CollectionDirectory(
866                     self.inode,
867                     self.inodes,
868                     self.api,
869                     self.num_retries,
870                     self._enable_write,
871                     self._filters,
872                     k,
873                 ))
874
875             if e.update():
876                 if k not in self._entries:
877                     self._entries[k] = e
878                 else:
879                     self.inodes.del_entry(e)
880                 return True
881             else:
882                 self.inodes.invalidate_entry(self, k)
883                 self.inodes.del_entry(e)
884                 return False
885         except Exception as ex:
886             _logger.exception("arv-mount lookup '%s':", k)
887             if e is not None:
888                 self.inodes.del_entry(e)
889             return False
890
891     def __getitem__(self, item):
892         if item in self:
893             return self._entries[item]
894         else:
895             raise KeyError("No collection with id " + item)
896
897     def clear(self):
898         pass
899
900     def want_event_subscribe(self):
901         return not self.pdh_only
902
903
904 class TagsDirectory(Directory):
905     """A special directory that contains as subdirectories all tags visible to the user."""
906
907     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
908         super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
909         self.api = api
910         self.num_retries = num_retries
911         self._poll = True
912         self._poll_time = poll_time
913         self._extra = set()
914
915     def want_event_subscribe(self):
916         return True
917
918     @use_counter
919     def update(self):
920         with llfuse.lock_released:
921             tags = self.api.links().list(
922                 filters=[
923                     ['link_class', '=', 'tag'],
924                     ['name', '!=', ''],
925                     *self._filters_for('links', qualified=False),
926                 ],
927                 select=['name'],
928                 distinct=True,
929                 limit=1000,
930             ).execute(num_retries=self.num_retries)
931         if "items" in tags:
932             self.merge(
933                 tags['items']+[{"name": n} for n in self._extra],
934                 lambda i: i['name'],
935                 lambda a, i: a.tag == i['name'],
936                 lambda i: TagDirectory(
937                     self.inode,
938                     self.inodes,
939                     self.api,
940                     self.num_retries,
941                     self._enable_write,
942                     self._filters,
943                     i['name'],
944                     poll=self._poll,
945                     poll_time=self._poll_time,
946                 ),
947             )
948
949     @use_counter
950     @check_update
951     def __getitem__(self, item):
952         if super(TagsDirectory, self).__contains__(item):
953             return super(TagsDirectory, self).__getitem__(item)
954         with llfuse.lock_released:
955             tags = self.api.links().list(
956                 filters=[
957                     ['link_class', '=', 'tag'],
958                     ['name', '=', item],
959                     *self._filters_for('links', qualified=False),
960                 ],
961                 limit=1,
962             ).execute(num_retries=self.num_retries)
963         if tags["items"]:
964             self._extra.add(item)
965             self.update()
966         return super(TagsDirectory, self).__getitem__(item)
967
968     @use_counter
969     @check_update
970     def __contains__(self, k):
971         if super(TagsDirectory, self).__contains__(k):
972             return True
973         try:
974             self[k]
975             return True
976         except KeyError:
977             pass
978         return False
979
980
981 class TagDirectory(Directory):
982     """A special directory that contains as subdirectories all collections visible
983     to the user that are tagged with a particular tag.
984     """
985
986     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
987                  poll=False, poll_time=60):
988         super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
989         self.api = api
990         self.num_retries = num_retries
991         self.tag = tag
992         self._poll = poll
993         self._poll_time = poll_time
994
995     def want_event_subscribe(self):
996         return True
997
998     @use_counter
999     def update(self):
1000         with llfuse.lock_released:
1001             taggedcollections = self.api.links().list(
1002                 filters=[
1003                     ['link_class', '=', 'tag'],
1004                     ['name', '=', self.tag],
1005                     ['head_uuid', 'is_a', 'arvados#collection'],
1006                     *self._filters_for('links', qualified=False),
1007                 ],
1008                 select=['head_uuid'],
1009             ).execute(num_retries=self.num_retries)
1010         self.merge(
1011             taggedcollections['items'],
1012             lambda i: i['head_uuid'],
1013             lambda a, i: a.collection_locator == i['head_uuid'],
1014             lambda i: CollectionDirectory(
1015                 self.inode,
1016                 self.inodes,
1017                 self.api,
1018                 self.num_retries,
1019                 self._enable_write,
1020                 self._filters,
1021                 i['head_uuid'],
1022             ),
1023         )
1024
1025
1026 class ProjectDirectory(Directory):
1027     """A special directory that contains the contents of a project."""
1028
1029     __slots__ = ("api", "num_retries", "project_object", "project_object_file",
1030                  "project_uuid", "_updating_lock",
1031                  "_current_user", "_full_listing", "storage_classes", "recursively_contained")
1032
1033     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1034                  project_object, poll=True, poll_time=3, storage_classes=None):
1035         super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1036         self.api = api
1037         self.num_retries = num_retries
1038         self.project_object = project_object
1039         self.project_object_file = None
1040         self.project_uuid = project_object['uuid']
1041         self._poll = poll
1042         self._poll_time = poll_time
1043         self._updating_lock = threading.Lock()
1044         self._current_user = None
1045         self._full_listing = False
1046         self.storage_classes = storage_classes
1047         self.recursively_contained = False
1048
1049         # Filter groups can contain themselves, which causes tools
1050         # that walk the filesystem to get stuck in an infinite loop,
1051         # so suppress returning a listing in that case.
1052         if self.project_object.get("group_class") == "filter":
1053             iter_parent_inode = parent_inode
1054             while iter_parent_inode != llfuse.ROOT_INODE:
1055                 parent_dir = self.inodes[iter_parent_inode]
1056                 if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid:
1057                     self.recursively_contained = True
1058                     break
1059                 iter_parent_inode = parent_dir.parent_inode
1060
1061     def want_event_subscribe(self):
1062         return True
1063
1064     def createDirectory(self, i):
1065         common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters)
1066         if collection_uuid_pattern.match(i['uuid']):
1067             return CollectionDirectory(*common_args, i)
1068         elif group_uuid_pattern.match(i['uuid']):
1069             return ProjectDirectory(*common_args, i, self._poll, self._poll_time, self.storage_classes)
1070         elif link_uuid_pattern.match(i['uuid']):
1071             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
1072                 return CollectionDirectory(*common_args, i['head_uuid'])
1073             else:
1074                 return None
1075         elif uuid_pattern.match(i['uuid']):
1076             return ObjectFile(self.parent_inode, i)
1077         else:
1078             return None
1079
1080     def uuid(self):
1081         return self.project_uuid
1082
1083     def items(self):
1084         self._full_listing = True
1085         return super(ProjectDirectory, self).items()
1086
1087     def namefn(self, i):
1088         if 'name' in i:
1089             if i['name'] is None or len(i['name']) == 0:
1090                 return None
1091             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
1092                 # collection or subproject
1093                 return i['name']
1094             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
1095                 # name link
1096                 return i['name']
1097             elif 'kind' in i and i['kind'].startswith('arvados#'):
1098                 # something else
1099                 return "{}.{}".format(i['name'], i['kind'][8:])
1100         else:
1101             return None
1102
1103
1104     @use_counter
1105     def update(self):
1106         if self.project_object_file == None:
1107             self.project_object_file = ObjectFile(self.inode, self.project_object)
1108             self.inodes.add_entry(self.project_object_file)
1109
1110         if self.recursively_contained or not self._full_listing:
1111             return True
1112
1113         def samefn(a, i):
1114             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
1115                 return a.uuid() == i['uuid']
1116             elif isinstance(a, ObjectFile):
1117                 return a.uuid() == i['uuid'] and not a.stale()
1118             return False
1119
1120         try:
1121             with llfuse.lock_released:
1122                 self._updating_lock.acquire()
1123                 if not self.stale():
1124                     return
1125
1126                 if group_uuid_pattern.match(self.project_uuid):
1127                     self.project_object = self.api.groups().get(
1128                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
1129                 elif user_uuid_pattern.match(self.project_uuid):
1130                     self.project_object = self.api.users().get(
1131                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
1132                 # do this in 2 steps until #17424 is fixed
1133                 contents = list(arvados.util.keyset_list_all(
1134                     self.api.groups().contents,
1135                     order_key='uuid',
1136                     num_retries=self.num_retries,
1137                     uuid=self.project_uuid,
1138                     filters=[
1139                         ['uuid', 'is_a', 'arvados#group'],
1140                         ['groups.group_class', 'in', ['project', 'filter']],
1141                         *self._filters_for('groups', qualified=True),
1142                     ],
1143                 ))
1144                 contents.extend(obj for obj in arvados.util.keyset_list_all(
1145                     self.api.groups().contents,
1146                     order_key='uuid',
1147                     num_retries=self.num_retries,
1148                     uuid=self.project_uuid,
1149                     filters=[
1150                         ['uuid', 'is_a', 'arvados#collection'],
1151                         *self._filters_for('collections', qualified=True),
1152                     ],
1153                 ) if obj['current_version_uuid'] == obj['uuid'])
1154             # end with llfuse.lock_released, re-acquire lock
1155
1156             self.merge(contents,
1157                        self.namefn,
1158                        samefn,
1159                        self.createDirectory)
1160             return True
1161         finally:
1162             self._updating_lock.release()
1163
1164     def _add_entry(self, i, name):
1165         ent = self.createDirectory(i)
1166         self._entries[name] = self.inodes.add_entry(ent)
1167         return self._entries[name]
1168
1169     @use_counter
1170     @check_update
1171     def __getitem__(self, k):
1172         if k == '.arvados#project':
1173             return self.project_object_file
1174         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1175             return super(ProjectDirectory, self).__getitem__(k)
1176         with llfuse.lock_released:
1177             k2 = self.unsanitize_filename(k)
1178             if k2 == k:
1179                 namefilter = ["name", "=", k]
1180             else:
1181                 namefilter = ["name", "in", [k, k2]]
1182             contents = self.api.groups().list(
1183                 filters=[
1184                     ["owner_uuid", "=", self.project_uuid],
1185                     ["group_class", "in", ["project","filter"]],
1186                     namefilter,
1187                     *self._filters_for('groups', qualified=False),
1188                 ],
1189                 limit=2,
1190             ).execute(num_retries=self.num_retries)["items"]
1191             if not contents:
1192                 contents = self.api.collections().list(
1193                     filters=[
1194                         ["owner_uuid", "=", self.project_uuid],
1195                         namefilter,
1196                         *self._filters_for('collections', qualified=False),
1197                     ],
1198                     limit=2,
1199                 ).execute(num_retries=self.num_retries)["items"]
1200         if contents:
1201             if len(contents) > 1 and contents[1]['name'] == k:
1202                 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1203                 # "foo[SUBST]bar".
1204                 contents = [contents[1]]
1205             name = self.sanitize_filename(self.namefn(contents[0]))
1206             if name != k:
1207                 raise KeyError(k)
1208             return self._add_entry(contents[0], name)
1209
1210         # Didn't find item
1211         raise KeyError(k)
1212
1213     def __contains__(self, k):
1214         if k == '.arvados#project':
1215             return True
1216         try:
1217             self[k]
1218             return True
1219         except KeyError:
1220             pass
1221         return False
1222
1223     @use_counter
1224     @check_update
1225     def writable(self):
1226         if not self._enable_write:
1227             return False
1228         with llfuse.lock_released:
1229             if not self._current_user:
1230                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1231             return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1232
1233     def persisted(self):
1234         return True
1235
1236     def clear(self):
1237         super(ProjectDirectory, self).clear()
1238         if self.project_object_file is not None:
1239             self.inodes.del_entry(self.project_object_file)
1240         self.project_object_file = None
1241
1242     @use_counter
1243     @check_update
1244     def mkdir(self, name):
1245         if not self.writable():
1246             raise llfuse.FUSEError(errno.EROFS)
1247
1248         try:
1249             with llfuse.lock_released:
1250                 c = {
1251                     "owner_uuid": self.project_uuid,
1252                     "name": name,
1253                     "manifest_text": "" }
1254                 if self.storage_classes is not None:
1255                     c["storage_classes_desired"] = self.storage_classes
1256                 try:
1257                     self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1258                 except Exception as e:
1259                     raise
1260             self.invalidate()
1261         except apiclient_errors.Error as error:
1262             _logger.error(error)
1263             raise llfuse.FUSEError(errno.EEXIST)
1264
1265     @use_counter
1266     @check_update
1267     def rmdir(self, name):
1268         if not self.writable():
1269             raise llfuse.FUSEError(errno.EROFS)
1270
1271         if name not in self:
1272             raise llfuse.FUSEError(errno.ENOENT)
1273         if not isinstance(self[name], CollectionDirectory):
1274             raise llfuse.FUSEError(errno.EPERM)
1275         if len(self[name]) > 0:
1276             raise llfuse.FUSEError(errno.ENOTEMPTY)
1277         with llfuse.lock_released:
1278             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1279         self.invalidate()
1280
1281     @use_counter
1282     @check_update
1283     def rename(self, name_old, name_new, src):
1284         if not self.writable():
1285             raise llfuse.FUSEError(errno.EROFS)
1286
1287         if not isinstance(src, ProjectDirectory):
1288             raise llfuse.FUSEError(errno.EPERM)
1289
1290         ent = src[name_old]
1291
1292         if not isinstance(ent, CollectionDirectory):
1293             raise llfuse.FUSEError(errno.EPERM)
1294
1295         if name_new in self:
1296             # POSIX semantics for replacing one directory with another is
1297             # tricky (the target directory must be empty, the operation must be
1298             # atomic which isn't possible with the Arvados API as of this
1299             # writing) so don't support that.
1300             raise llfuse.FUSEError(errno.EPERM)
1301
1302         self.api.collections().update(uuid=ent.uuid(),
1303                                       body={"owner_uuid": self.uuid(),
1304                                             "name": name_new}).execute(num_retries=self.num_retries)
1305
1306         # Acually move the entry from source directory to this directory.
1307         del src._entries[name_old]
1308         self._entries[name_new] = ent
1309         self.inodes.invalidate_entry(src, name_old)
1310
1311     @use_counter
1312     def child_event(self, ev):
1313         properties = ev.get("properties") or {}
1314         old_attrs = properties.get("old_attributes") or {}
1315         new_attrs = properties.get("new_attributes") or {}
1316         old_attrs["uuid"] = ev["object_uuid"]
1317         new_attrs["uuid"] = ev["object_uuid"]
1318         old_name = self.sanitize_filename(self.namefn(old_attrs))
1319         new_name = self.sanitize_filename(self.namefn(new_attrs))
1320
1321         # create events will have a new name, but not an old name
1322         # delete events will have an old name, but not a new name
1323         # update events will have an old and new name, and they may be same or different
1324         # if they are the same, an unrelated field changed and there is nothing to do.
1325
1326         if old_attrs.get("owner_uuid") != self.project_uuid:
1327             # Was moved from somewhere else, so don't try to remove entry.
1328             old_name = None
1329         if ev.get("object_owner_uuid") != self.project_uuid:
1330             # Was moved to somewhere else, so don't try to add entry
1331             new_name = None
1332
1333         if old_attrs.get("is_trashed"):
1334             # Was previously deleted
1335             old_name = None
1336         if new_attrs.get("is_trashed"):
1337             # Has been deleted
1338             new_name = None
1339
1340         if new_name != old_name:
1341             ent = None
1342             if old_name in self._entries:
1343                 ent = self._entries[old_name]
1344                 del self._entries[old_name]
1345                 self.inodes.invalidate_entry(self, old_name)
1346
1347             if new_name:
1348                 if ent is not None:
1349                     self._entries[new_name] = ent
1350                 else:
1351                     self._add_entry(new_attrs, new_name)
1352             elif ent is not None:
1353                 self.inodes.del_entry(ent)
1354
1355
1356 class SharedDirectory(Directory):
1357     """A special directory that represents users or groups who have shared projects with me."""
1358
1359     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1360                  exclude, poll=False, poll_time=60, storage_classes=None):
1361         super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1362         self.api = api
1363         self.num_retries = num_retries
1364         self.current_user = api.users().current().execute(num_retries=num_retries)
1365         self._poll = True
1366         self._poll_time = poll_time
1367         self._updating_lock = threading.Lock()
1368         self.storage_classes = storage_classes
1369
1370     @use_counter
1371     def update(self):
1372         try:
1373             with llfuse.lock_released:
1374                 self._updating_lock.acquire()
1375                 if not self.stale():
1376                     return
1377
1378                 contents = {}
1379                 roots = []
1380                 root_owners = set()
1381                 objects = {}
1382
1383                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1384                 if 'httpMethod' in methods.get('shared', {}):
1385                     page = []
1386                     while True:
1387                         resp = self.api.groups().shared(
1388                             filters=[
1389                                 ['group_class', 'in', ['project','filter']],
1390                                 *page,
1391                                 *self._filters_for('groups', qualified=False),
1392                             ],
1393                             order="uuid",
1394                             limit=10000,
1395                             count="none",
1396                             include="owner_uuid",
1397                         ).execute()
1398                         if not resp["items"]:
1399                             break
1400                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1401                         for r in resp["items"]:
1402                             objects[r["uuid"]] = r
1403                             roots.append(r["uuid"])
1404                         for r in resp["included"]:
1405                             objects[r["uuid"]] = r
1406                             root_owners.add(r["uuid"])
1407                 else:
1408                     all_projects = list(arvados.util.keyset_list_all(
1409                         self.api.groups().list,
1410                         order_key="uuid",
1411                         num_retries=self.num_retries,
1412                         filters=[
1413                             ['group_class', 'in', ['project','filter']],
1414                             *self._filters_for('groups', qualified=False),
1415                         ],
1416                         select=["uuid", "owner_uuid"],
1417                     ))
1418                     for ob in all_projects:
1419                         objects[ob['uuid']] = ob
1420
1421                     current_uuid = self.current_user['uuid']
1422                     for ob in all_projects:
1423                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1424                             roots.append(ob['uuid'])
1425                             root_owners.add(ob['owner_uuid'])
1426
1427                     lusers = arvados.util.keyset_list_all(
1428                         self.api.users().list,
1429                         order_key="uuid",
1430                         num_retries=self.num_retries,
1431                         filters=[
1432                             ['uuid', 'in', list(root_owners)],
1433                             *self._filters_for('users', qualified=False),
1434                         ],
1435                     )
1436                     lgroups = arvados.util.keyset_list_all(
1437                         self.api.groups().list,
1438                         order_key="uuid",
1439                         num_retries=self.num_retries,
1440                         filters=[
1441                             ['uuid', 'in', list(root_owners)+roots],
1442                             *self._filters_for('groups', qualified=False),
1443                         ],
1444                     )
1445                     for l in lusers:
1446                         objects[l["uuid"]] = l
1447                     for l in lgroups:
1448                         objects[l["uuid"]] = l
1449
1450                 for r in root_owners:
1451                     if r in objects:
1452                         obr = objects[r]
1453                         if obr.get("name"):
1454                             contents[obr["name"]] = obr
1455                         elif "first_name" in obr:
1456                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1457
1458                 for r in roots:
1459                     if r in objects:
1460                         obr = objects[r]
1461                         if obr['owner_uuid'] not in objects:
1462                             contents[obr["name"]] = obr
1463
1464             # end with llfuse.lock_released, re-acquire lock
1465
1466             self.merge(
1467                 contents.items(),
1468                 lambda i: i[0],
1469                 lambda a, i: a.uuid() == i[1]['uuid'],
1470                 lambda i: ProjectDirectory(
1471                     self.inode,
1472                     self.inodes,
1473                     self.api,
1474                     self.num_retries,
1475                     self._enable_write,
1476                     self._filters,
1477                     i[1],
1478                     poll=self._poll,
1479                     poll_time=self._poll_time,
1480                     storage_classes=self.storage_classes,
1481                 ),
1482             )
1483         except Exception:
1484             _logger.exception("arv-mount shared dir error")
1485         finally:
1486             self._updating_lock.release()
1487
1488     def want_event_subscribe(self):
1489         return True