]> git.arvados.org - arvados.git/blob - services/fuse/arvados_fuse/fusedir.py
22420: Check if file needs kernel cache refresh on open
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import apiclient
6 import arvados
7 import errno
8 import functools
9 import llfuse
10 import logging
11 import re
12 import sys
13 import threading
14 import time
15 from apiclient import errors as apiclient_errors
16
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
19
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
22
23 _logger = logging.getLogger('arvados.arvados_fuse')
24
25
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile(r'[\x00/]')
30
31
32 class Directory(FreshBase):
33     """Generic directory object, backed by a dict.
34
35     Consists of a set of entries with the key representing the filename
36     and the value referencing a File or Directory object.
37     """
38
39     __slots__ = ("inode", "parent_inode", "inodes", "_entries", "_mtime", "_enable_write", "_filters")
40
41     def __init__(self, parent_inode, inodes, enable_write, filters):
42         """parent_inode is the integer inode number"""
43
44         super(Directory, self).__init__()
45
46         self.inode = None
47         if not isinstance(parent_inode, int):
48             raise Exception("parent_inode should be an int")
49         self.parent_inode = parent_inode
50         self.inodes = inodes
51         self._entries = {}
52         self._mtime = time.time()
53         self._enable_write = enable_write
54         self._filters = filters or []
55
56     def _filters_for(self, subtype, *, qualified):
57         for f in self._filters:
58             f_type, _, f_name = f[0].partition('.')
59             if not f_name:
60                 yield f
61             elif f_type != subtype:
62                 pass
63             elif qualified:
64                 yield f
65             else:
66                 yield [f_name, *f[1:]]
67
68     def unsanitize_filename(self, incoming):
69         """Replace ForwardSlashNameSubstitution value with /"""
70         fsns = self.inodes.forward_slash_subst()
71         if isinstance(fsns, str):
72             return incoming.replace(fsns, '/')
73         else:
74             return incoming
75
76     def sanitize_filename(self, dirty):
77         """Replace disallowed filename characters according to
78         ForwardSlashNameSubstitution in self.api_config."""
79         # '.' and '..' are not reachable if API server is newer than #6277
80         if dirty is None:
81             return None
82         elif dirty == '':
83             return '_'
84         elif dirty == '.':
85             return '_'
86         elif dirty == '..':
87             return '__'
88         else:
89             fsns = self.inodes.forward_slash_subst()
90             if isinstance(fsns, str):
91                 dirty = dirty.replace('/', fsns)
92             return _disallowed_filename_characters.sub('_', dirty)
93
94
95     #  Overridden by subclasses to implement logic to update the
96     #  entries dict when the directory is stale
97     @use_counter
98     def update(self):
99         pass
100
101     # Only used when computing the size of the disk footprint of the directory
102     # (stub)
103     def size(self):
104         return 0
105
106     def persisted(self):
107         return False
108
109     def checkupdate(self):
110         if self.stale():
111             try:
112                 self.update()
113             except apiclient.errors.HttpError as e:
114                 _logger.warn(e)
115
116     @use_counter
117     @check_update
118     def __getitem__(self, item):
119         return self._entries[item]
120
121     @use_counter
122     @check_update
123     def items(self):
124         return list(self._entries.items())
125
126     @use_counter
127     @check_update
128     def __contains__(self, k):
129         return k in self._entries
130
131     @use_counter
132     @check_update
133     def __len__(self):
134         return len(self._entries)
135
136     def fresh(self):
137         self.inodes.touch(self)
138         super(Directory, self).fresh()
139
140     def objsize(self):
141         # Rough estimate of memory footprint based on using pympler
142         return len(self._entries) * 1024
143
144     def merge(self, items, fn, same, new_entry):
145         """Helper method for updating the contents of the directory.
146
147         Takes a list describing the new contents of the directory, reuse
148         entries that are the same in both the old and new lists, create new
149         entries, and delete old entries missing from the new list.
150
151         Arguments:
152         * items: Iterable --- New directory contents
153
154         * fn: Callable --- Takes an entry in 'items' and return the desired file or
155         directory name, or None if this entry should be skipped
156
157         * same: Callable --- Compare an existing entry (a File or Directory
158         object) with an entry in the items list to determine whether to keep
159         the existing entry.
160
161         * new_entry: Callable --- Create a new directory entry (File or Directory
162         object) from an entry in the items list.
163
164         """
165
166         oldentries = self._entries
167         self._entries = {}
168         changed = False
169         for i in items:
170             name = self.sanitize_filename(fn(i))
171             if not name:
172                 continue
173             if name in oldentries:
174                 ent = oldentries[name]
175                 if same(ent, i) and ent.parent_inode == self.inode:
176                     # move existing directory entry over
177                     self._entries[name] = ent
178                     del oldentries[name]
179                     self.inodes.inode_cache.touch(ent)
180
181         for i in items:
182             name = self.sanitize_filename(fn(i))
183             if not name:
184                 continue
185             if name not in self._entries:
186                 # create new directory entry
187                 ent = new_entry(i)
188                 if ent is not None:
189                     self._entries[name] = self.inodes.add_entry(ent)
190                     # need to invalidate this just in case there was a
191                     # previous entry that couldn't be moved over or a
192                     # lookup that returned file not found and cached
193                     # a negative result
194                     self.inodes.invalidate_entry(self, name)
195                     changed = True
196                 _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
197
198         # delete any other directory entries that were not in found in 'items'
199         for name, ent in oldentries.items():
200             _logger.debug("Detaching entry '%s' from parent_inode %i", name, self.inode)
201             self.inodes.invalidate_entry(self, name)
202             self.inodes.del_entry(ent)
203             changed = True
204
205         if changed:
206             self._mtime = time.time()
207             self.inodes.inode_cache.update_cache_size(self)
208
209         self.fresh()
210
211     def in_use(self):
212         if super(Directory, self).in_use():
213             return True
214         for v in self._entries.values():
215             if v.in_use():
216                 return True
217         return False
218
219     def clear(self):
220         """Delete all entries"""
221         if not self._entries:
222             return
223         oldentries = self._entries
224         self._entries = {}
225         self.invalidate()
226         for name, ent in oldentries.items():
227             ent.clear()
228             self.inodes.invalidate_entry(self, name)
229             self.inodes.del_entry(ent)
230         self.inodes.inode_cache.update_cache_size(self)
231
232     def kernel_invalidate(self):
233         # Invalidating the dentry on the parent implies invalidating all paths
234         # below it as well.
235         if self.parent_inode in self.inodes:
236             parent = self.inodes[self.parent_inode]
237         else:
238             # parent was removed already.
239             return
240
241         # Find self on the parent in order to invalidate this path.
242         # Calling the public items() method might trigger a refresh,
243         # which we definitely don't want, so read the internal dict directly.
244         for k,v in parent._entries.items():
245             if v is self:
246                 self.inodes.invalidate_entry(parent, k)
247                 break
248
249     def mtime(self):
250         return self._mtime
251
252     def writable(self):
253         return False
254
255     def flush(self):
256         pass
257
258     def want_event_subscribe(self):
259         raise NotImplementedError()
260
261     def create(self, name):
262         raise NotImplementedError()
263
264     def mkdir(self, name):
265         raise NotImplementedError()
266
267     def unlink(self, name):
268         raise NotImplementedError()
269
270     def rmdir(self, name):
271         raise NotImplementedError()
272
273     def rename(self, name_old, name_new, src):
274         raise NotImplementedError()
275
276
277 class CollectionDirectoryBase(Directory):
278     """Represent an Arvados Collection as a directory.
279
280     This class is used for Subcollections, and is also the base class for
281     CollectionDirectory, which implements collection loading/saving on
282     Collection records.
283
284     Most operations act only the underlying Arvados `Collection` object.  The
285     `Collection` object signals via a notify callback to
286     `CollectionDirectoryBase.on_event` that an item was added, removed or
287     modified.  FUSE inodes and directory entries are created, deleted or
288     invalidated in response to these events.
289
290     """
291
292     __slots__ = ("collection", "collection_root", "collection_record_file")
293
294     def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root):
295         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
296         self.collection = collection
297         self.collection_root = collection_root
298         self.collection_record_file = None
299
300     def new_entry(self, name, item, mtime):
301         name = self.sanitize_filename(name)
302         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
303             if item.fuse_entry.parent_inode is not None:
304                 raise Exception("Can only reparent unparented inode entry")
305             if item.fuse_entry.inode is None:
306                 raise Exception("Reparented entry must still have valid inode")
307             item.fuse_entry.parent_inode = self.inode
308             self._entries[name] = item.fuse_entry
309         elif isinstance(item, arvados.collection.RichCollectionBase):
310             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
311                 self.inode,
312                 self.inodes,
313                 self._enable_write,
314                 self._filters,
315                 item,
316                 self.collection_root,
317             ))
318             self._entries[name].populate(mtime)
319         else:
320             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write, self._poll, self._poll_time))
321         item.fuse_entry = self._entries[name]
322
323     def on_event(self, event, collection, name, item):
324         # These are events from the Collection object (ADD/DEL/MOD)
325         # emitted by operations on the Collection object (like
326         # "mkdirs" or "remove"), and by "update", which we need to
327         # synchronize with our FUSE objects that are assigned inodes.
328         if collection == self.collection:
329             name = self.sanitize_filename(name)
330
331             #
332             # It's possible for another thread to have llfuse.lock and
333             # be waiting on collection.lock.  Meanwhile, we released
334             # llfuse.lock earlier in the stack, but are still holding
335             # on to the collection lock, and now we need to re-acquire
336             # llfuse.lock.  If we don't release the collection lock,
337             # we'll deadlock where we're holding the collection lock
338             # waiting for llfuse.lock and the other thread is holding
339             # llfuse.lock and waiting for the collection lock.
340             #
341             # The correct locking order here is to take llfuse.lock
342             # first, then the collection lock.
343             #
344             # Since collection.lock is an RLock, it might be locked
345             # multiple times, so we need to release it multiple times,
346             # keep a count, then re-lock it the correct number of
347             # times.
348             #
349             lockcount = 0
350             try:
351                 while True:
352                     self.collection.lock.release()
353                     lockcount += 1
354             except RuntimeError:
355                 pass
356
357             try:
358                 with llfuse.lock:
359                     with self.collection.lock:
360                         if event == arvados.collection.ADD:
361                             self.new_entry(name, item, self.mtime())
362                         elif event == arvados.collection.DEL:
363                             ent = self._entries[name]
364                             del self._entries[name]
365                             self.inodes.invalidate_entry(self, name)
366                             self.inodes.del_entry(ent)
367                         elif event == arvados.collection.MOD:
368                             # MOD events have (modified_from, newitem)
369                             newitem = item[1]
370                             entry = None
371                             if hasattr(newitem, "fuse_entry") and newitem.fuse_entry is not None:
372                                 entry = newitem.fuse_entry
373                             elif name in self._entries:
374                                 entry = self._entries[name]
375
376                             if entry is not None:
377                                 entry.invalidate()
378                                 self.inodes.invalidate_inode(entry)
379
380                             if name in self._entries:
381                                 self.inodes.invalidate_entry(self, name)
382
383                         # we don't care about TOK events, those mean
384                         # only token signatures were updated
385
386                         if self.collection_record_file is not None:
387                             self.collection_record_file.invalidate()
388                             self.inodes.invalidate_inode(self.collection_record_file)
389             finally:
390                 while lockcount > 0:
391                     self.collection.lock.acquire()
392                     lockcount -= 1
393
394     def populate(self, mtime):
395         self._mtime = mtime
396         with self.collection.lock:
397             self.collection.subscribe(self.on_event)
398             for entry, item in self.collection.items():
399                 self.new_entry(entry, item, self.mtime())
400
401     def writable(self):
402         return self._enable_write and self.collection.writable()
403
404     @use_counter
405     def flush(self):
406         self.collection_root.flush()
407
408     @use_counter
409     @check_update
410     def create(self, name):
411         if not self.writable():
412             raise llfuse.FUSEError(errno.EROFS)
413         with llfuse.lock_released:
414             self.collection.open(name, "w").close()
415
416     @use_counter
417     @check_update
418     def mkdir(self, name):
419         if not self.writable():
420             raise llfuse.FUSEError(errno.EROFS)
421         with llfuse.lock_released:
422             self.collection.mkdirs(name)
423
424     @use_counter
425     @check_update
426     def unlink(self, name):
427         if not self.writable():
428             raise llfuse.FUSEError(errno.EROFS)
429         with llfuse.lock_released:
430             self.collection.remove(name)
431         self.flush()
432
433     @use_counter
434     @check_update
435     def rmdir(self, name):
436         if not self.writable():
437             raise llfuse.FUSEError(errno.EROFS)
438         with llfuse.lock_released:
439             self.collection.remove(name)
440         self.flush()
441
442     @use_counter
443     @check_update
444     def rename(self, name_old, name_new, src):
445         if not self.writable():
446             raise llfuse.FUSEError(errno.EROFS)
447
448         if not isinstance(src, CollectionDirectoryBase):
449             raise llfuse.FUSEError(errno.EPERM)
450
451         if name_new in self:
452             ent = src[name_old]
453             tgt = self[name_new]
454             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
455                 pass
456             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
457                 if len(tgt) > 0:
458                     raise llfuse.FUSEError(errno.ENOTEMPTY)
459             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
460                 raise llfuse.FUSEError(errno.ENOTDIR)
461             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
462                 raise llfuse.FUSEError(errno.EISDIR)
463
464         with llfuse.lock_released:
465             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
466         self.flush()
467         src.flush()
468
469     def clear(self):
470         super(CollectionDirectoryBase, self).clear()
471         if self.collection is not None:
472             self.collection.unsubscribe()
473         self.collection = None
474
475     def objsize(self):
476         # objsize for the whole collection is represented at the root,
477         # don't double-count it
478         return 0
479
480 class CollectionDirectory(CollectionDirectoryBase):
481     """Represents the root of a directory tree representing a collection."""
482
483     __slots__ = ("api", "num_retries", "collection_locator",
484                  "_manifest_size", "_writable", "_updating_lock")
485
486     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
487         super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
488         self.api = api
489         self.num_retries = num_retries
490         self._poll = True
491
492         if isinstance(collection_record, dict):
493             self.collection_locator = collection_record['uuid']
494             self._mtime = convertTime(collection_record.get('modified_at'))
495         else:
496             self.collection_locator = collection_record
497             self._mtime = 0
498
499         is_uuid = (self.collection_locator is not None) and (uuid_pattern.match(self.collection_locator) is not None)
500
501         if is_uuid:
502             # It is a uuid, it may be updated upstream, so recheck it periodically.
503             self._poll_time = 15
504         else:
505             # It is not a uuid.  For immutable collections, collection
506             # only needs to be refreshed if it is very long lived
507             # (long enough that there's a risk of the blob signatures
508             # expiring).
509             try:
510                 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
511             except:
512                 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
513                 self._poll_time = 60*60
514
515         self._writable = is_uuid and enable_write
516         self._manifest_size = 0
517         self._updating_lock = threading.Lock()
518
519     def same(self, i):
520         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
521
522     def writable(self):
523         return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
524
525     @use_counter
526     def flush(self):
527         with llfuse.lock_released:
528             with self._updating_lock:
529                 if self.collection.committed():
530                     self.collection.update()
531                 else:
532                     self.collection.save()
533                 self.new_collection_record(self.collection.api_response())
534
535     def want_event_subscribe(self):
536         return (uuid_pattern.match(self.collection_locator) is not None)
537
538     def new_collection(self, new_collection_record, coll_reader):
539         if self.inode:
540             self.clear()
541         self.collection = coll_reader
542         self.new_collection_record(new_collection_record)
543         self.populate(self.mtime())
544
545     def new_collection_record(self, new_collection_record):
546         if not new_collection_record:
547             raise Exception("invalid new_collection_record")
548         self._mtime = convertTime(new_collection_record.get('modified_at'))
549         self._manifest_size = len(new_collection_record["manifest_text"])
550         self.collection_locator = new_collection_record["uuid"]
551         if self.collection_record_file is not None:
552             self.collection_record_file.invalidate()
553             self.inodes.invalidate_inode(self.collection_record_file)
554             _logger.debug("parent_inode %s invalidated collection record file inode %s", self.inode,
555                           self.collection_record_file.inode)
556         self.inodes.update_uuid(self)
557         self.inodes.inode_cache.update_cache_size(self)
558         self.fresh()
559
560     def uuid(self):
561         return self.collection_locator
562
563     @use_counter
564     def update(self):
565         try:
566             if self.collection_locator is None:
567                 # No collection locator to retrieve from
568                 self.fresh()
569                 return True
570
571             new_collection_record = None
572             try:
573                 with llfuse.lock_released:
574                     self._updating_lock.acquire()
575                     if not self.stale():
576                         return True
577
578                     _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
579                     coll_reader = None
580                     if self.collection is not None:
581                         # Already have a collection object
582                         self.collection.update()
583                         new_collection_record = self.collection.api_response()
584                     else:
585                         # Create a new collection object
586                         if uuid_pattern.match(self.collection_locator):
587                             coll_reader = arvados.collection.Collection(
588                                 self.collection_locator, self.api, self.api.keep,
589                                 num_retries=self.num_retries)
590                         else:
591                             coll_reader = arvados.collection.CollectionReader(
592                                 self.collection_locator, self.api, self.api.keep,
593                                 num_retries=self.num_retries)
594                         new_collection_record = coll_reader.api_response() or {}
595                         # If the Collection only exists in Keep, there will be no API
596                         # response.  Fill in the fields we need.
597                         if 'uuid' not in new_collection_record:
598                             new_collection_record['uuid'] = self.collection_locator
599                         if "portable_data_hash" not in new_collection_record:
600                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
601                         if 'manifest_text' not in new_collection_record:
602                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
603                         if 'storage_classes_desired' not in new_collection_record:
604                             new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
605
606                 # end with llfuse.lock_released, re-acquire lock
607
608                 if new_collection_record is not None:
609                     if coll_reader is not None:
610                         self.new_collection(new_collection_record, coll_reader)
611                     else:
612                         self.new_collection_record(new_collection_record)
613
614                 return True
615             finally:
616                 self._updating_lock.release()
617         except arvados.errors.NotFoundError as e:
618             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
619         except arvados.errors.ArgumentError as detail:
620             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
621             if new_collection_record is not None and "manifest_text" in new_collection_record:
622                 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
623         except Exception:
624             _logger.exception("arv-mount %s: error", self.collection_locator)
625             if new_collection_record is not None and "manifest_text" in new_collection_record:
626                 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
627         self.invalidate()
628         return False
629
630     @use_counter
631     @check_update
632     def collection_record(self):
633         self.flush()
634         return self.collection.api_response()
635
636     @use_counter
637     @check_update
638     def __getitem__(self, item):
639         if item == '.arvados#collection':
640             if self.collection_record_file is None:
641                 self.collection_record_file = FuncToJSONFile(
642                     self.inode, self.collection_record)
643                 self.inodes.add_entry(self.collection_record_file)
644             self.invalidate()  # use lookup as a signal to force update
645             return self.collection_record_file
646         else:
647             return super(CollectionDirectory, self).__getitem__(item)
648
649     def __contains__(self, k):
650         if k == '.arvados#collection':
651             return True
652         else:
653             return super(CollectionDirectory, self).__contains__(k)
654
655     def invalidate(self):
656         if self.collection_record_file is not None:
657             self.collection_record_file.invalidate()
658             self.inodes.invalidate_inode(self.collection_record_file)
659         super(CollectionDirectory, self).invalidate()
660
661     def persisted(self):
662         return (self.collection_locator is not None)
663
664     def objsize(self):
665         # This is a rough guess of the amount of overhead involved for
666         # a collection; the assumptions are that that each file
667         # averages 128 bytes in the manifest, but consume 1024 bytes
668         # of Python data structures, so 1024/128=8 means we estimate
669         # the RAM footprint at 8 times the size of bare manifest text.
670         return self._manifest_size * 8
671
672     def finalize(self):
673         if self.collection is None:
674             return
675
676         if self.writable():
677             try:
678                 self.collection.save()
679             except Exception as e:
680                 _logger.exception("Failed to save collection %s", self.collection_locator)
681         self.collection.stop_threads()
682
683     def clear(self):
684         if self.collection is not None:
685             self.collection.stop_threads()
686         self._manifest_size = 0
687         super(CollectionDirectory, self).clear()
688         if self.collection_record_file is not None:
689             self.inodes.del_entry(self.collection_record_file)
690         self.collection_record_file = None
691
692
693 class TmpCollectionDirectory(CollectionDirectoryBase):
694     """A directory backed by an Arvados collection that never gets saved.
695
696     This supports using Keep as scratch space. A userspace program can
697     read the .arvados#collection file to get a current manifest in
698     order to save a snapshot of the scratch data or use it as a crunch
699     job output.
700     """
701
702     class UnsaveableCollection(arvados.collection.Collection):
703         def save(self):
704             pass
705         def save_new(self):
706             pass
707
708     def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None):
709         collection = self.UnsaveableCollection(
710             api_client=api_client,
711             keep_client=api_client.keep,
712             num_retries=num_retries,
713             storage_classes_desired=storage_classes)
714         # This is always enable_write=True because it never tries to
715         # save to the backend
716         super(TmpCollectionDirectory, self).__init__(
717             parent_inode, inodes, True, filters, collection, self)
718         self.populate(self.mtime())
719
720     def on_event(self, *args, **kwargs):
721         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
722         if self.collection_record_file is None:
723             return
724
725         # See discussion in CollectionDirectoryBase.on_event
726         lockcount = 0
727         try:
728             while True:
729                 self.collection.lock.release()
730                 lockcount += 1
731         except RuntimeError:
732             pass
733
734         try:
735             with llfuse.lock:
736                 with self.collection.lock:
737                     self.collection_record_file.invalidate()
738                     self.inodes.invalidate_inode(self.collection_record_file)
739                     _logger.debug("%s invalidated collection record", self.inode)
740         finally:
741             while lockcount > 0:
742                 self.collection.lock.acquire()
743                 lockcount -= 1
744
745     def collection_record(self):
746         with llfuse.lock_released:
747             return {
748                 "uuid": None,
749                 "manifest_text": self.collection.manifest_text(),
750                 "portable_data_hash": self.collection.portable_data_hash(),
751                 "storage_classes_desired": self.collection.storage_classes_desired(),
752             }
753
754     def __contains__(self, k):
755         return (k == '.arvados#collection' or
756                 super(TmpCollectionDirectory, self).__contains__(k))
757
758     @use_counter
759     def __getitem__(self, item):
760         if item == '.arvados#collection':
761             if self.collection_record_file is None:
762                 self.collection_record_file = FuncToJSONFile(
763                     self.inode, self.collection_record)
764                 self.inodes.add_entry(self.collection_record_file)
765             return self.collection_record_file
766         return super(TmpCollectionDirectory, self).__getitem__(item)
767
768     def persisted(self):
769         return False
770
771     def writable(self):
772         return True
773
774     def flush(self):
775         pass
776
777     def want_event_subscribe(self):
778         return False
779
780     def finalize(self):
781         self.collection.stop_threads()
782
783     def invalidate(self):
784         if self.collection_record_file:
785             self.collection_record_file.invalidate()
786         super(TmpCollectionDirectory, self).invalidate()
787
788
789 class MagicDirectory(Directory):
790     """A special directory that logically contains the set of all extant keep locators.
791
792     When a file is referenced by lookup(), it is tested to see if it is a valid
793     keep locator to a manifest, and if so, loads the manifest contents as a
794     subdirectory of this directory with the locator as the directory name.
795     Since querying a list of all extant keep locators is impractical, only
796     collections that have already been accessed are visible to readdir().
797
798     """
799
800     README_TEXT = """
801 This directory provides access to Arvados collections as subdirectories listed
802 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
803 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
804 (in the form 'zzzzz-j7d0g-1234567890abcde').
805
806 Note that this directory will appear empty until you attempt to access a
807 specific collection or project subdirectory (such as trying to 'cd' into it),
808 at which point the collection or project will actually be looked up on the server
809 and the directory will appear if it exists.
810
811 """.lstrip()
812
813     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
814         super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
815         self.api = api
816         self.num_retries = num_retries
817         self.pdh_only = pdh_only
818         self.storage_classes = storage_classes
819
820     def __setattr__(self, name, value):
821         super(MagicDirectory, self).__setattr__(name, value)
822         # When we're assigned an inode, add a README.
823         if ((name == 'inode') and (self.inode is not None) and
824               (not self._entries)):
825             self._entries['README'] = self.inodes.add_entry(
826                 StringFile(self.inode, self.README_TEXT, time.time()))
827             # If we're the root directory, add an identical by_id subdirectory.
828             if self.inode == llfuse.ROOT_INODE:
829                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
830                     self.inode,
831                     self.inodes,
832                     self.api,
833                     self.num_retries,
834                     self._enable_write,
835                     self._filters,
836                     self.pdh_only,
837                 ))
838
839     def __contains__(self, k):
840         if k in self._entries:
841             return True
842
843         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
844             return False
845
846         try:
847             e = None
848
849             if group_uuid_pattern.match(k):
850                 project = self.api.groups().list(
851                     filters=[
852                         ['group_class', 'in', ['project','filter']],
853                         ["uuid", "=", k],
854                         *self._filters_for('groups', qualified=False),
855                     ],
856                 ).execute(num_retries=self.num_retries)
857                 if project[u'items_available'] == 0:
858                     return False
859                 e = self.inodes.add_entry(ProjectDirectory(
860                     self.inode,
861                     self.inodes,
862                     self.api,
863                     self.num_retries,
864                     self._enable_write,
865                     self._filters,
866                     project[u'items'][0],
867                     storage_classes=self.storage_classes,
868                 ))
869             else:
870                 e = self.inodes.add_entry(CollectionDirectory(
871                     self.inode,
872                     self.inodes,
873                     self.api,
874                     self.num_retries,
875                     self._enable_write,
876                     self._filters,
877                     k,
878                 ))
879
880             if e.update():
881                 if k not in self._entries:
882                     self._entries[k] = e
883                 else:
884                     self.inodes.del_entry(e)
885                 return True
886             else:
887                 self.inodes.invalidate_entry(self, k)
888                 self.inodes.del_entry(e)
889                 return False
890         except Exception as ex:
891             _logger.exception("arv-mount lookup '%s':", k)
892             if e is not None:
893                 self.inodes.del_entry(e)
894             return False
895
896     def __getitem__(self, item):
897         if item in self:
898             return self._entries[item]
899         else:
900             raise KeyError("No collection with id " + item)
901
902     def clear(self):
903         pass
904
905     def want_event_subscribe(self):
906         return not self.pdh_only
907
908
909 class TagsDirectory(Directory):
910     """A special directory that contains as subdirectories all tags visible to the user."""
911
912     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
913         super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
914         self.api = api
915         self.num_retries = num_retries
916         self._poll = True
917         self._poll_time = poll_time
918         self._extra = set()
919
920     def want_event_subscribe(self):
921         return True
922
923     @use_counter
924     def update(self):
925         with llfuse.lock_released:
926             tags = self.api.links().list(
927                 filters=[
928                     ['link_class', '=', 'tag'],
929                     ['name', '!=', ''],
930                     *self._filters_for('links', qualified=False),
931                 ],
932                 select=['name'],
933                 distinct=True,
934                 limit=1000,
935             ).execute(num_retries=self.num_retries)
936         if "items" in tags:
937             self.merge(
938                 tags['items']+[{"name": n} for n in self._extra],
939                 lambda i: i['name'],
940                 lambda a, i: a.tag == i['name'],
941                 lambda i: TagDirectory(
942                     self.inode,
943                     self.inodes,
944                     self.api,
945                     self.num_retries,
946                     self._enable_write,
947                     self._filters,
948                     i['name'],
949                     poll=self._poll,
950                     poll_time=self._poll_time,
951                 ),
952             )
953
954     @use_counter
955     @check_update
956     def __getitem__(self, item):
957         if super(TagsDirectory, self).__contains__(item):
958             return super(TagsDirectory, self).__getitem__(item)
959         with llfuse.lock_released:
960             tags = self.api.links().list(
961                 filters=[
962                     ['link_class', '=', 'tag'],
963                     ['name', '=', item],
964                     *self._filters_for('links', qualified=False),
965                 ],
966                 limit=1,
967             ).execute(num_retries=self.num_retries)
968         if tags["items"]:
969             self._extra.add(item)
970             self.update()
971         return super(TagsDirectory, self).__getitem__(item)
972
973     @use_counter
974     @check_update
975     def __contains__(self, k):
976         if super(TagsDirectory, self).__contains__(k):
977             return True
978         try:
979             self[k]
980             return True
981         except KeyError:
982             pass
983         return False
984
985
986 class TagDirectory(Directory):
987     """A special directory that contains as subdirectories all collections visible
988     to the user that are tagged with a particular tag.
989     """
990
991     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
992                  poll=False, poll_time=60):
993         super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
994         self.api = api
995         self.num_retries = num_retries
996         self.tag = tag
997         self._poll = poll
998         self._poll_time = poll_time
999
1000     def want_event_subscribe(self):
1001         return True
1002
1003     @use_counter
1004     def update(self):
1005         with llfuse.lock_released:
1006             taggedcollections = self.api.links().list(
1007                 filters=[
1008                     ['link_class', '=', 'tag'],
1009                     ['name', '=', self.tag],
1010                     ['head_uuid', 'is_a', 'arvados#collection'],
1011                     *self._filters_for('links', qualified=False),
1012                 ],
1013                 select=['head_uuid'],
1014             ).execute(num_retries=self.num_retries)
1015         self.merge(
1016             taggedcollections['items'],
1017             lambda i: i['head_uuid'],
1018             lambda a, i: a.collection_locator == i['head_uuid'],
1019             lambda i: CollectionDirectory(
1020                 self.inode,
1021                 self.inodes,
1022                 self.api,
1023                 self.num_retries,
1024                 self._enable_write,
1025                 self._filters,
1026                 i['head_uuid'],
1027             ),
1028         )
1029
1030
1031 class ProjectDirectory(Directory):
1032     """A special directory that contains the contents of a project."""
1033
1034     __slots__ = ("api", "num_retries", "project_object", "project_object_file",
1035                  "project_uuid", "_updating_lock",
1036                  "_current_user", "_full_listing", "storage_classes", "recursively_contained")
1037
1038     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1039                  project_object, poll=True, poll_time=15, storage_classes=None):
1040         super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1041         self.api = api
1042         self.num_retries = num_retries
1043         self.project_object = project_object
1044         self.project_object_file = None
1045         self.project_uuid = project_object['uuid']
1046         self._poll = poll
1047         self._poll_time = poll_time
1048         self._updating_lock = threading.Lock()
1049         self._current_user = None
1050         self._full_listing = False
1051         self.storage_classes = storage_classes
1052         self.recursively_contained = False
1053
1054         # Filter groups can contain themselves, which causes tools
1055         # that walk the filesystem to get stuck in an infinite loop,
1056         # so suppress returning a listing in that case.
1057         if self.project_object.get("group_class") == "filter":
1058             iter_parent_inode = parent_inode
1059             while iter_parent_inode != llfuse.ROOT_INODE:
1060                 parent_dir = self.inodes[iter_parent_inode]
1061                 if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid:
1062                     self.recursively_contained = True
1063                     break
1064                 iter_parent_inode = parent_dir.parent_inode
1065
1066     def want_event_subscribe(self):
1067         return True
1068
1069     def createDirectory(self, i):
1070         common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters)
1071         if collection_uuid_pattern.match(i['uuid']):
1072             return CollectionDirectory(*common_args, i)
1073         elif group_uuid_pattern.match(i['uuid']):
1074             return ProjectDirectory(*common_args, i, self._poll, self._poll_time, self.storage_classes)
1075         elif link_uuid_pattern.match(i['uuid']):
1076             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
1077                 return CollectionDirectory(*common_args, i['head_uuid'])
1078             else:
1079                 return None
1080         elif uuid_pattern.match(i['uuid']):
1081             return ObjectFile(self.parent_inode, i)
1082         else:
1083             return None
1084
1085     def uuid(self):
1086         return self.project_uuid
1087
1088     def items(self):
1089         self._full_listing = True
1090         return super(ProjectDirectory, self).items()
1091
1092     def namefn(self, i):
1093         if 'name' in i:
1094             if i['name'] is None or len(i['name']) == 0:
1095                 return None
1096             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
1097                 # collection or subproject
1098                 return i['name']
1099             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
1100                 # name link
1101                 return i['name']
1102             elif 'kind' in i and i['kind'].startswith('arvados#'):
1103                 # something else
1104                 return "{}.{}".format(i['name'], i['kind'][8:])
1105         else:
1106             return None
1107
1108
1109     @use_counter
1110     def update(self):
1111         if self.project_object_file == None:
1112             self.project_object_file = ObjectFile(self.inode, self.project_object)
1113             self.inodes.add_entry(self.project_object_file)
1114
1115         if self.recursively_contained or not self._full_listing:
1116             return True
1117
1118         def samefn(a, i):
1119             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
1120                 return a.uuid() == i['uuid']
1121             elif isinstance(a, ObjectFile):
1122                 return a.uuid() == i['uuid'] and not a.stale()
1123             return False
1124
1125         try:
1126             with llfuse.lock_released:
1127                 self._updating_lock.acquire()
1128                 if not self.stale():
1129                     return
1130
1131                 if group_uuid_pattern.match(self.project_uuid):
1132                     self.project_object = self.api.groups().get(
1133                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
1134                 elif user_uuid_pattern.match(self.project_uuid):
1135                     self.project_object = self.api.users().get(
1136                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
1137                 # do this in 2 steps until #17424 is fixed
1138                 contents = list(arvados.util.keyset_list_all(
1139                     self.api.groups().contents,
1140                     order_key='uuid',
1141                     num_retries=self.num_retries,
1142                     uuid=self.project_uuid,
1143                     filters=[
1144                         ['uuid', 'is_a', 'arvados#group'],
1145                         ['groups.group_class', 'in', ['project', 'filter']],
1146                         *self._filters_for('groups', qualified=True),
1147                     ],
1148                 ))
1149                 contents.extend(obj for obj in arvados.util.keyset_list_all(
1150                     self.api.groups().contents,
1151                     order_key='uuid',
1152                     num_retries=self.num_retries,
1153                     uuid=self.project_uuid,
1154                     filters=[
1155                         ['uuid', 'is_a', 'arvados#collection'],
1156                         *self._filters_for('collections', qualified=True),
1157                     ],
1158                 ) if obj['current_version_uuid'] == obj['uuid'])
1159             # end with llfuse.lock_released, re-acquire lock
1160
1161             self.merge(contents,
1162                        self.namefn,
1163                        samefn,
1164                        self.createDirectory)
1165             return True
1166         finally:
1167             self._updating_lock.release()
1168
1169     def _add_entry(self, i, name):
1170         ent = self.createDirectory(i)
1171         self._entries[name] = self.inodes.add_entry(ent)
1172         return self._entries[name]
1173
1174     @use_counter
1175     @check_update
1176     def __getitem__(self, k):
1177         if k == '.arvados#project':
1178             return self.project_object_file
1179         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1180             return super(ProjectDirectory, self).__getitem__(k)
1181         with llfuse.lock_released:
1182             k2 = self.unsanitize_filename(k)
1183             if k2 == k:
1184                 namefilter = ["name", "=", k]
1185             else:
1186                 namefilter = ["name", "in", [k, k2]]
1187             contents = self.api.groups().list(
1188                 filters=[
1189                     ["owner_uuid", "=", self.project_uuid],
1190                     ["group_class", "in", ["project","filter"]],
1191                     namefilter,
1192                     *self._filters_for('groups', qualified=False),
1193                 ],
1194                 limit=2,
1195             ).execute(num_retries=self.num_retries)["items"]
1196             if not contents:
1197                 contents = self.api.collections().list(
1198                     filters=[
1199                         ["owner_uuid", "=", self.project_uuid],
1200                         namefilter,
1201                         *self._filters_for('collections', qualified=False),
1202                     ],
1203                     limit=2,
1204                 ).execute(num_retries=self.num_retries)["items"]
1205         if contents:
1206             if len(contents) > 1 and contents[1]['name'] == k:
1207                 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1208                 # "foo[SUBST]bar".
1209                 contents = [contents[1]]
1210             name = self.sanitize_filename(self.namefn(contents[0]))
1211             if name != k:
1212                 raise KeyError(k)
1213             return self._add_entry(contents[0], name)
1214
1215         # Didn't find item
1216         raise KeyError(k)
1217
1218     def __contains__(self, k):
1219         if k == '.arvados#project':
1220             return True
1221         try:
1222             self[k]
1223             return True
1224         except KeyError:
1225             pass
1226         return False
1227
1228     @use_counter
1229     @check_update
1230     def writable(self):
1231         if not self._enable_write:
1232             return False
1233         return self.project_object.get("can_write") is True
1234
1235     def persisted(self):
1236         return True
1237
1238     def clear(self):
1239         super(ProjectDirectory, self).clear()
1240         if self.project_object_file is not None:
1241             self.inodes.del_entry(self.project_object_file)
1242         self.project_object_file = None
1243
1244     @use_counter
1245     @check_update
1246     def mkdir(self, name):
1247         if not self.writable():
1248             raise llfuse.FUSEError(errno.EROFS)
1249
1250         try:
1251             with llfuse.lock_released:
1252                 c = {
1253                     "owner_uuid": self.project_uuid,
1254                     "name": name,
1255                     "manifest_text": "" }
1256                 if self.storage_classes is not None:
1257                     c["storage_classes_desired"] = self.storage_classes
1258                 try:
1259                     self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1260                 except Exception as e:
1261                     raise
1262             self.invalidate()
1263         except apiclient_errors.Error as error:
1264             _logger.error(error)
1265             raise llfuse.FUSEError(errno.EEXIST)
1266
1267     @use_counter
1268     @check_update
1269     def rmdir(self, name):
1270         if not self.writable():
1271             raise llfuse.FUSEError(errno.EROFS)
1272
1273         if name not in self:
1274             raise llfuse.FUSEError(errno.ENOENT)
1275         if not isinstance(self[name], CollectionDirectory):
1276             raise llfuse.FUSEError(errno.EPERM)
1277         if len(self[name]) > 0:
1278             raise llfuse.FUSEError(errno.ENOTEMPTY)
1279         with llfuse.lock_released:
1280             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1281         self.invalidate()
1282
1283     @use_counter
1284     @check_update
1285     def rename(self, name_old, name_new, src):
1286         if not self.writable():
1287             raise llfuse.FUSEError(errno.EROFS)
1288
1289         if not isinstance(src, ProjectDirectory):
1290             raise llfuse.FUSEError(errno.EPERM)
1291
1292         ent = src[name_old]
1293
1294         if not isinstance(ent, CollectionDirectory):
1295             raise llfuse.FUSEError(errno.EPERM)
1296
1297         if name_new in self:
1298             # POSIX semantics for replacing one directory with another is
1299             # tricky (the target directory must be empty, the operation must be
1300             # atomic which isn't possible with the Arvados API as of this
1301             # writing) so don't support that.
1302             raise llfuse.FUSEError(errno.EPERM)
1303
1304         self.api.collections().update(uuid=ent.uuid(),
1305                                       body={"owner_uuid": self.uuid(),
1306                                             "name": name_new}).execute(num_retries=self.num_retries)
1307
1308         # Acually move the entry from source directory to this directory.
1309         del src._entries[name_old]
1310         self._entries[name_new] = ent
1311         self.inodes.invalidate_entry(src, name_old)
1312
1313     @use_counter
1314     def child_event(self, ev):
1315         properties = ev.get("properties") or {}
1316         old_attrs = properties.get("old_attributes") or {}
1317         new_attrs = properties.get("new_attributes") or {}
1318         old_attrs["uuid"] = ev["object_uuid"]
1319         new_attrs["uuid"] = ev["object_uuid"]
1320         old_name = self.sanitize_filename(self.namefn(old_attrs))
1321         new_name = self.sanitize_filename(self.namefn(new_attrs))
1322
1323         # create events will have a new name, but not an old name
1324         # delete events will have an old name, but not a new name
1325         # update events will have an old and new name, and they may be same or different
1326         # if they are the same, an unrelated field changed and there is nothing to do.
1327
1328         if old_attrs.get("owner_uuid") != self.project_uuid:
1329             # Was moved from somewhere else, so don't try to remove entry.
1330             old_name = None
1331         if ev.get("object_owner_uuid") != self.project_uuid:
1332             # Was moved to somewhere else, so don't try to add entry
1333             new_name = None
1334
1335         if old_attrs.get("is_trashed"):
1336             # Was previously deleted
1337             old_name = None
1338         if new_attrs.get("is_trashed"):
1339             # Has been deleted
1340             new_name = None
1341
1342         if new_name != old_name:
1343             ent = None
1344             if old_name in self._entries:
1345                 ent = self._entries[old_name]
1346                 del self._entries[old_name]
1347                 self.inodes.invalidate_entry(self, old_name)
1348
1349             if new_name:
1350                 if ent is not None:
1351                     self._entries[new_name] = ent
1352                 else:
1353                     self._add_entry(new_attrs, new_name)
1354             elif ent is not None:
1355                 self.inodes.del_entry(ent)
1356
1357
1358 class SharedDirectory(Directory):
1359     """A special directory that represents users or groups who have shared projects with me."""
1360
1361     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1362                  exclude, poll=False, poll_time=60, storage_classes=None):
1363         super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1364         self.api = api
1365         self.num_retries = num_retries
1366         self.current_user = api.users().current().execute(num_retries=num_retries)
1367         self._poll = True
1368         self._poll_time = poll_time
1369         self._updating_lock = threading.Lock()
1370         self.storage_classes = storage_classes
1371
1372     @use_counter
1373     def update(self):
1374         try:
1375             with llfuse.lock_released:
1376                 self._updating_lock.acquire()
1377                 if not self.stale():
1378                     return
1379
1380                 contents = {}
1381                 roots = []
1382                 root_owners = set()
1383                 objects = {}
1384
1385                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1386                 if 'httpMethod' in methods.get('shared', {}):
1387                     page = []
1388                     while True:
1389                         resp = self.api.groups().shared(
1390                             filters=[
1391                                 ['group_class', 'in', ['project','filter']],
1392                                 *page,
1393                                 *self._filters_for('groups', qualified=False),
1394                             ],
1395                             order="uuid",
1396                             limit=10000,
1397                             count="none",
1398                             include="owner_uuid",
1399                         ).execute()
1400                         if not resp["items"]:
1401                             break
1402                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1403                         for r in resp["items"]:
1404                             objects[r["uuid"]] = r
1405                             roots.append(r["uuid"])
1406                         for r in resp["included"]:
1407                             objects[r["uuid"]] = r
1408                             root_owners.add(r["uuid"])
1409                 else:
1410                     all_projects = list(arvados.util.keyset_list_all(
1411                         self.api.groups().list,
1412                         order_key="uuid",
1413                         num_retries=self.num_retries,
1414                         filters=[
1415                             ['group_class', 'in', ['project','filter']],
1416                             *self._filters_for('groups', qualified=False),
1417                         ],
1418                         select=["uuid", "owner_uuid"],
1419                     ))
1420                     for ob in all_projects:
1421                         objects[ob['uuid']] = ob
1422
1423                     current_uuid = self.current_user['uuid']
1424                     for ob in all_projects:
1425                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1426                             roots.append(ob['uuid'])
1427                             root_owners.add(ob['owner_uuid'])
1428
1429                     lusers = arvados.util.keyset_list_all(
1430                         self.api.users().list,
1431                         order_key="uuid",
1432                         num_retries=self.num_retries,
1433                         filters=[
1434                             ['uuid', 'in', list(root_owners)],
1435                             *self._filters_for('users', qualified=False),
1436                         ],
1437                     )
1438                     lgroups = arvados.util.keyset_list_all(
1439                         self.api.groups().list,
1440                         order_key="uuid",
1441                         num_retries=self.num_retries,
1442                         filters=[
1443                             ['uuid', 'in', list(root_owners)+roots],
1444                             *self._filters_for('groups', qualified=False),
1445                         ],
1446                     )
1447                     for l in lusers:
1448                         objects[l["uuid"]] = l
1449                     for l in lgroups:
1450                         objects[l["uuid"]] = l
1451
1452                 for r in root_owners:
1453                     if r in objects:
1454                         obr = objects[r]
1455                         if obr.get("name"):
1456                             contents[obr["name"]] = obr
1457                         elif "first_name" in obr:
1458                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1459
1460                 for r in roots:
1461                     if r in objects:
1462                         obr = objects[r]
1463                         if obr['owner_uuid'] not in objects:
1464                             contents[obr["name"]] = obr
1465
1466             # end with llfuse.lock_released, re-acquire lock
1467
1468             self.merge(
1469                 contents.items(),
1470                 lambda i: i[0],
1471                 lambda a, i: a.uuid() == i[1]['uuid'],
1472                 lambda i: ProjectDirectory(
1473                     self.inode,
1474                     self.inodes,
1475                     self.api,
1476                     self.num_retries,
1477                     self._enable_write,
1478                     self._filters,
1479                     i[1],
1480                     poll=self._poll,
1481                     poll_time=self._poll_time,
1482                     storage_classes=self.storage_classes,
1483                 ),
1484             )
1485         except Exception:
1486             _logger.exception("arv-mount shared dir error")
1487         finally:
1488             self._updating_lock.release()
1489
1490     def want_event_subscribe(self):
1491         return True