]> git.arvados.org - arvados.git/blob - services/fuse/arvados_fuse/fusedir.py
Merge branch '22336-resize-details-panel' into main. Closes #22336
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import apiclient
6 import arvados
7 import errno
8 import functools
9 import llfuse
10 import logging
11 import re
12 import sys
13 import threading
14 import time
15 from apiclient import errors as apiclient_errors
16
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
19
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
22
23 _logger = logging.getLogger('arvados.arvados_fuse')
24
25
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile(r'[\x00/]')
30
31
32 class Directory(FreshBase):
33     """Generic directory object, backed by a dict.
34
35     Consists of a set of entries with the key representing the filename
36     and the value referencing a File or Directory object.
37     """
38
39     __slots__ = ("inode", "parent_inode", "inodes", "_entries", "_mtime", "_enable_write", "_filters")
40
41     def __init__(self, parent_inode, inodes, enable_write, filters):
42         """parent_inode is the integer inode number"""
43
44         super(Directory, self).__init__()
45
46         self.inode = None
47         if not isinstance(parent_inode, int):
48             raise Exception("parent_inode should be an int")
49         self.parent_inode = parent_inode
50         self.inodes = inodes
51         self._entries = {}
52         self._mtime = time.time()
53         self._enable_write = enable_write
54         self._filters = filters or []
55
56     def _filters_for(self, subtype, *, qualified):
57         for f in self._filters:
58             f_type, _, f_name = f[0].partition('.')
59             if not f_name:
60                 yield f
61             elif f_type != subtype:
62                 pass
63             elif qualified:
64                 yield f
65             else:
66                 yield [f_name, *f[1:]]
67
68     def unsanitize_filename(self, incoming):
69         """Replace ForwardSlashNameSubstitution value with /"""
70         fsns = self.inodes.forward_slash_subst()
71         if isinstance(fsns, str):
72             return incoming.replace(fsns, '/')
73         else:
74             return incoming
75
76     def sanitize_filename(self, dirty):
77         """Replace disallowed filename characters according to
78         ForwardSlashNameSubstitution in self.api_config."""
79         # '.' and '..' are not reachable if API server is newer than #6277
80         if dirty is None:
81             return None
82         elif dirty == '':
83             return '_'
84         elif dirty == '.':
85             return '_'
86         elif dirty == '..':
87             return '__'
88         else:
89             fsns = self.inodes.forward_slash_subst()
90             if isinstance(fsns, str):
91                 dirty = dirty.replace('/', fsns)
92             return _disallowed_filename_characters.sub('_', dirty)
93
94
95     #  Overridden by subclasses to implement logic to update the
96     #  entries dict when the directory is stale
97     @use_counter
98     def update(self):
99         pass
100
101     # Only used when computing the size of the disk footprint of the directory
102     # (stub)
103     def size(self):
104         return 0
105
106     def persisted(self):
107         return False
108
109     def checkupdate(self):
110         if self.stale():
111             try:
112                 self.update()
113             except apiclient.errors.HttpError as e:
114                 _logger.warn(e)
115
116     @use_counter
117     @check_update
118     def __getitem__(self, item):
119         return self._entries[item]
120
121     @use_counter
122     @check_update
123     def items(self):
124         return list(self._entries.items())
125
126     @use_counter
127     @check_update
128     def __contains__(self, k):
129         return k in self._entries
130
131     @use_counter
132     @check_update
133     def __len__(self):
134         return len(self._entries)
135
136     def fresh(self):
137         self.inodes.touch(self)
138         super(Directory, self).fresh()
139
140     def objsize(self):
141         # Rough estimate of memory footprint based on using pympler
142         return len(self._entries) * 1024
143
144     def merge(self, items, fn, same, new_entry):
145         """Helper method for updating the contents of the directory.
146
147         Takes a list describing the new contents of the directory, reuse
148         entries that are the same in both the old and new lists, create new
149         entries, and delete old entries missing from the new list.
150
151         Arguments:
152         * items: Iterable --- New directory contents
153
154         * fn: Callable --- Takes an entry in 'items' and return the desired file or
155         directory name, or None if this entry should be skipped
156
157         * same: Callable --- Compare an existing entry (a File or Directory
158         object) with an entry in the items list to determine whether to keep
159         the existing entry.
160
161         * new_entry: Callable --- Create a new directory entry (File or Directory
162         object) from an entry in the items list.
163
164         """
165
166         oldentries = self._entries
167         self._entries = {}
168         changed = False
169         for i in items:
170             name = self.sanitize_filename(fn(i))
171             if not name:
172                 continue
173             if name in oldentries:
174                 ent = oldentries[name]
175                 if same(ent, i) and ent.parent_inode == self.inode:
176                     # move existing directory entry over
177                     self._entries[name] = ent
178                     del oldentries[name]
179                     self.inodes.inode_cache.touch(ent)
180
181         for i in items:
182             name = self.sanitize_filename(fn(i))
183             if not name:
184                 continue
185             if name not in self._entries:
186                 # create new directory entry
187                 ent = new_entry(i)
188                 if ent is not None:
189                     self._entries[name] = self.inodes.add_entry(ent)
190                     # need to invalidate this just in case there was a
191                     # previous entry that couldn't be moved over or a
192                     # lookup that returned file not found and cached
193                     # a negative result
194                     self.inodes.invalidate_entry(self, name)
195                     changed = True
196                 _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
197
198         # delete any other directory entries that were not in found in 'items'
199         for name, ent in oldentries.items():
200             _logger.debug("Detaching entry '%s' from parent_inode %i", name, self.inode)
201             self.inodes.invalidate_entry(self, name)
202             self.inodes.del_entry(ent)
203             changed = True
204
205         if changed:
206             self._mtime = time.time()
207             self.inodes.inode_cache.update_cache_size(self)
208
209         self.fresh()
210
211     def in_use(self):
212         if super(Directory, self).in_use():
213             return True
214         for v in self._entries.values():
215             if v.in_use():
216                 return True
217         return False
218
219     def clear(self):
220         """Delete all entries"""
221         if not self._entries:
222             return
223         oldentries = self._entries
224         self._entries = {}
225         self.invalidate()
226         for name, ent in oldentries.items():
227             ent.clear()
228             self.inodes.invalidate_entry(self, name)
229             self.inodes.del_entry(ent)
230         self.inodes.inode_cache.update_cache_size(self)
231
232     def kernel_invalidate(self):
233         # Invalidating the dentry on the parent implies invalidating all paths
234         # below it as well.
235         if self.parent_inode in self.inodes:
236             parent = self.inodes[self.parent_inode]
237         else:
238             # parent was removed already.
239             return
240
241         # Find self on the parent in order to invalidate this path.
242         # Calling the public items() method might trigger a refresh,
243         # which we definitely don't want, so read the internal dict directly.
244         for k,v in parent._entries.items():
245             if v is self:
246                 self.inodes.invalidate_entry(parent, k)
247                 break
248
249     def mtime(self):
250         return self._mtime
251
252     def writable(self):
253         return False
254
255     def flush(self):
256         pass
257
258     def want_event_subscribe(self):
259         raise NotImplementedError()
260
261     def create(self, name):
262         raise NotImplementedError()
263
264     def mkdir(self, name):
265         raise NotImplementedError()
266
267     def unlink(self, name):
268         raise NotImplementedError()
269
270     def rmdir(self, name):
271         raise NotImplementedError()
272
273     def rename(self, name_old, name_new, src):
274         raise NotImplementedError()
275
276
277 class CollectionDirectoryBase(Directory):
278     """Represent an Arvados Collection as a directory.
279
280     This class is used for Subcollections, and is also the base class for
281     CollectionDirectory, which implements collection loading/saving on
282     Collection records.
283
284     Most operations act only the underlying Arvados `Collection` object.  The
285     `Collection` object signals via a notify callback to
286     `CollectionDirectoryBase.on_event` that an item was added, removed or
287     modified.  FUSE inodes and directory entries are created, deleted or
288     invalidated in response to these events.
289
290     """
291
292     __slots__ = ("collection", "collection_root", "collection_record_file")
293
294     def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root, poll_time=15):
295         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
296         self.collection = collection
297         self.collection_root = collection_root
298         self.collection_record_file = None
299         self._poll_time = poll_time
300
301     def new_entry(self, name, item, mtime):
302         name = self.sanitize_filename(name)
303         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
304             if item.fuse_entry.parent_inode is not None:
305                 raise Exception("Can only reparent unparented inode entry")
306             if item.fuse_entry.inode is None:
307                 raise Exception("Reparented entry must still have valid inode")
308             item.fuse_entry.parent_inode = self.inode
309             self._entries[name] = item.fuse_entry
310         elif isinstance(item, arvados.collection.RichCollectionBase):
311             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
312                 self.inode,
313                 self.inodes,
314                 self._enable_write,
315                 self._filters,
316                 item,
317                 self.collection_root,
318                 poll_time=self._poll_time
319             ))
320             self._entries[name].populate(mtime)
321         else:
322             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime,
323                                                                         self._enable_write,
324                                                                         self._poll, self._poll_time))
325         item.fuse_entry = self._entries[name]
326
327     def on_event(self, event, collection, name, item):
328
329         # These are events from the Collection object (ADD/DEL/MOD)
330         # emitted by operations on the Collection object (like
331         # "mkdirs" or "remove"), and by "update", which we need to
332         # synchronize with our FUSE objects that are assigned inodes.
333         if collection != self.collection:
334             return
335
336         name = self.sanitize_filename(name)
337
338         #
339         # It's possible for another thread to have llfuse.lock and
340         # be waiting on collection.lock.  Meanwhile, we released
341         # llfuse.lock earlier in the stack, but are still holding
342         # on to the collection lock, and now we need to re-acquire
343         # llfuse.lock.  If we don't release the collection lock,
344         # we'll deadlock where we're holding the collection lock
345         # waiting for llfuse.lock and the other thread is holding
346         # llfuse.lock and waiting for the collection lock.
347         #
348         # The correct locking order here is to take llfuse.lock
349         # first, then the collection lock.
350         #
351         # Since collection.lock is an RLock, it might be locked
352         # multiple times, so we need to release it multiple times,
353         # keep a count, then re-lock it the correct number of
354         # times.
355         #
356         lockcount = 0
357         try:
358             while True:
359                 self.collection.lock.release()
360                 lockcount += 1
361         except RuntimeError:
362             pass
363
364         try:
365             with llfuse.lock:
366                 with self.collection.lock:
367                     if event == arvados.collection.ADD:
368                         self.new_entry(name, item, self.mtime())
369                     elif event == arvados.collection.DEL:
370                         ent = self._entries[name]
371                         del self._entries[name]
372                         self.inodes.invalidate_entry(self, name)
373                         self.inodes.del_entry(ent)
374                     elif event == arvados.collection.MOD:
375                         # MOD events have (modified_from, newitem)
376                         newitem = item[1]
377                         entry = None
378                         if hasattr(newitem, "fuse_entry") and newitem.fuse_entry is not None:
379                             entry = newitem.fuse_entry
380                         elif name in self._entries:
381                             entry = self._entries[name]
382
383                         if entry is not None:
384                             entry.invalidate()
385                             self.inodes.invalidate_inode(entry)
386
387                         if name in self._entries:
388                             self.inodes.invalidate_entry(self, name)
389
390                     # TOK and WRITE events just invalidate the
391                     # collection record file.
392
393                     if self.collection_record_file is not None:
394                         self.collection_record_file.invalidate()
395                         self.inodes.invalidate_inode(self.collection_record_file)
396         finally:
397             while lockcount > 0:
398                 self.collection.lock.acquire()
399                 lockcount -= 1
400
401     def populate(self, mtime):
402         self._mtime = mtime
403         with self.collection.lock:
404             self.collection.subscribe(self.on_event)
405             for entry, item in self.collection.items():
406                 self.new_entry(entry, item, self.mtime())
407
408     def writable(self):
409         return self._enable_write and self.collection.writable()
410
411     @use_counter
412     def flush(self):
413         self.collection_root.flush()
414
415     @use_counter
416     @check_update
417     def create(self, name):
418         if not self.writable():
419             raise llfuse.FUSEError(errno.EROFS)
420         with llfuse.lock_released:
421             self.collection.open(name, "w").close()
422
423     @use_counter
424     @check_update
425     def mkdir(self, name):
426         if not self.writable():
427             raise llfuse.FUSEError(errno.EROFS)
428         with llfuse.lock_released:
429             self.collection.mkdirs(name)
430
431     @use_counter
432     @check_update
433     def unlink(self, name):
434         if not self.writable():
435             raise llfuse.FUSEError(errno.EROFS)
436         with llfuse.lock_released:
437             self.collection.remove(name)
438         self.flush()
439
440     @use_counter
441     @check_update
442     def rmdir(self, name):
443         if not self.writable():
444             raise llfuse.FUSEError(errno.EROFS)
445         with llfuse.lock_released:
446             self.collection.remove(name)
447         self.flush()
448
449     @use_counter
450     @check_update
451     def rename(self, name_old, name_new, src):
452         if not self.writable():
453             raise llfuse.FUSEError(errno.EROFS)
454
455         if not isinstance(src, CollectionDirectoryBase):
456             raise llfuse.FUSEError(errno.EPERM)
457
458         if name_new in self:
459             ent = src[name_old]
460             tgt = self[name_new]
461             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
462                 pass
463             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
464                 if len(tgt) > 0:
465                     raise llfuse.FUSEError(errno.ENOTEMPTY)
466             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
467                 raise llfuse.FUSEError(errno.ENOTDIR)
468             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
469                 raise llfuse.FUSEError(errno.EISDIR)
470
471         with llfuse.lock_released:
472             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
473         self.flush()
474         src.flush()
475
476     def clear(self):
477         super(CollectionDirectoryBase, self).clear()
478         if self.collection is not None:
479             self.collection.unsubscribe()
480         self.collection = None
481
482     def objsize(self):
483         # objsize for the whole collection is represented at the root,
484         # don't double-count it
485         return 0
486
487 class CollectionDirectory(CollectionDirectoryBase):
488     """Represents the root of a directory tree representing a collection."""
489
490     __slots__ = ("api", "num_retries", "collection_locator",
491                  "_manifest_size", "_writable", "_updating_lock")
492
493     def __init__(self, parent_inode, inodes, api, num_retries, enable_write,
494                  filters=None, collection_record=None,
495                  poll_time=15):
496         super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
497         self.api = api
498         self.num_retries = num_retries
499         self._poll = True
500
501         if isinstance(collection_record, dict):
502             self.collection_locator = collection_record['uuid']
503             self._mtime = convertTime(collection_record.get('modified_at'))
504         else:
505             self.collection_locator = collection_record
506             self._mtime = 0
507
508         is_uuid = (self.collection_locator is not None) and (uuid_pattern.match(self.collection_locator) is not None)
509
510         if is_uuid:
511             # It is a uuid, it may be updated upstream, so recheck it periodically.
512             self._poll_time = poll_time
513         else:
514             # It is not a uuid.  For immutable collections, collection
515             # only needs to be refreshed if it is very long lived
516             # (long enough that there's a risk of the blob signatures
517             # expiring).
518             try:
519                 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
520             except:
521                 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
522                 self._poll_time = 60*60
523
524         self._writable = is_uuid and enable_write
525         self._manifest_size = 0
526         self._updating_lock = threading.Lock()
527
528     def same(self, i):
529         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
530
531     def writable(self):
532         return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
533
534     @use_counter
535     def flush(self):
536         with llfuse.lock_released:
537             with self._updating_lock:
538                 if self.collection.committed():
539                     self.collection.update()
540                 else:
541                     self.collection.save()
542                 self.new_collection_record(self.collection.api_response())
543
544     def want_event_subscribe(self):
545         return (uuid_pattern.match(self.collection_locator) is not None)
546
547     def new_collection(self, new_collection_record, coll_reader):
548         if self.inode:
549             self.clear()
550         self.collection = coll_reader
551         self.new_collection_record(new_collection_record)
552         self.populate(self.mtime())
553
554     def new_collection_record(self, new_collection_record):
555         if not new_collection_record:
556             raise Exception("invalid new_collection_record")
557         self._mtime = convertTime(new_collection_record.get('modified_at'))
558         self._manifest_size = len(new_collection_record["manifest_text"])
559         self.collection_locator = new_collection_record["uuid"]
560         if self.collection_record_file is not None:
561             self.collection_record_file.invalidate()
562             self.inodes.invalidate_inode(self.collection_record_file)
563             _logger.debug("parent_inode %s invalidated collection record file inode %s", self.inode,
564                           self.collection_record_file.inode)
565         self.inodes.update_uuid(self)
566         self.inodes.inode_cache.update_cache_size(self)
567         self.fresh()
568
569     def uuid(self):
570         return self.collection_locator
571
572     @use_counter
573     def update(self):
574         try:
575             if self.collection_locator is None:
576                 # No collection locator to retrieve from
577                 self.fresh()
578                 return True
579
580             new_collection_record = None
581             try:
582                 with llfuse.lock_released:
583                     self._updating_lock.acquire()
584                     if not self.stale():
585                         return True
586
587                     _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
588                     coll_reader = None
589                     if self.collection is not None:
590                         # Already have a collection object
591                         self.collection.update()
592                         new_collection_record = self.collection.api_response()
593                     else:
594                         # Create a new collection object
595                         if uuid_pattern.match(self.collection_locator):
596                             coll_reader = arvados.collection.Collection(
597                                 self.collection_locator, self.api, self.api.keep,
598                                 num_retries=self.num_retries)
599                         else:
600                             coll_reader = arvados.collection.CollectionReader(
601                                 self.collection_locator, self.api, self.api.keep,
602                                 num_retries=self.num_retries)
603                         new_collection_record = coll_reader.api_response() or {}
604                         # If the Collection only exists in Keep, there will be no API
605                         # response.  Fill in the fields we need.
606                         if 'uuid' not in new_collection_record:
607                             new_collection_record['uuid'] = self.collection_locator
608                         if "portable_data_hash" not in new_collection_record:
609                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
610                         if 'manifest_text' not in new_collection_record:
611                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
612                         if 'storage_classes_desired' not in new_collection_record:
613                             new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
614
615                 # end with llfuse.lock_released, re-acquire lock
616
617                 if new_collection_record is not None:
618                     if coll_reader is not None:
619                         self.new_collection(new_collection_record, coll_reader)
620                     else:
621                         self.new_collection_record(new_collection_record)
622
623                 return True
624             finally:
625                 self._updating_lock.release()
626         except arvados.errors.NotFoundError as e:
627             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
628         except arvados.errors.ArgumentError as detail:
629             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
630             if new_collection_record is not None and "manifest_text" in new_collection_record:
631                 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
632         except Exception:
633             _logger.exception("arv-mount %s: error", self.collection_locator)
634             if new_collection_record is not None and "manifest_text" in new_collection_record:
635                 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
636         self.invalidate()
637         return False
638
639     @use_counter
640     @check_update
641     def collection_record(self):
642         self.flush()
643         return self.collection.api_response()
644
645     @use_counter
646     @check_update
647     def __getitem__(self, item):
648         if item == '.arvados#collection':
649             if self.collection_record_file is None:
650                 self.collection_record_file = FuncToJSONFile(
651                     self.inode, self.collection_record)
652                 self.inodes.add_entry(self.collection_record_file)
653             self.invalidate()  # use lookup as a signal to force update
654             return self.collection_record_file
655         else:
656             return super(CollectionDirectory, self).__getitem__(item)
657
658     def __contains__(self, k):
659         if k == '.arvados#collection':
660             return True
661         else:
662             return super(CollectionDirectory, self).__contains__(k)
663
664     def invalidate(self):
665         if self.collection_record_file is not None:
666             self.collection_record_file.invalidate()
667             self.inodes.invalidate_inode(self.collection_record_file)
668         super(CollectionDirectory, self).invalidate()
669
670     def persisted(self):
671         return (self.collection_locator is not None)
672
673     def objsize(self):
674         # This is a rough guess of the amount of overhead involved for
675         # a collection; the assumptions are that that each file
676         # averages 128 bytes in the manifest, but consume 1024 bytes
677         # of Python data structures, so 1024/128=8 means we estimate
678         # the RAM footprint at 8 times the size of bare manifest text.
679         return self._manifest_size * 8
680
681     def finalize(self):
682         if self.collection is None:
683             return
684
685         if self.writable():
686             try:
687                 self.collection.save()
688             except Exception as e:
689                 _logger.exception("Failed to save collection %s", self.collection_locator)
690         self.collection.stop_threads()
691
692     def clear(self):
693         if self.collection is not None:
694             self.collection.stop_threads()
695         self._manifest_size = 0
696         super(CollectionDirectory, self).clear()
697         if self.collection_record_file is not None:
698             self.inodes.del_entry(self.collection_record_file)
699         self.collection_record_file = None
700
701
702 class TmpCollectionDirectory(CollectionDirectoryBase):
703     """A directory backed by an Arvados collection that never gets saved.
704
705     This supports using Keep as scratch space. A userspace program can
706     read the .arvados#collection file to get a current manifest in
707     order to save a snapshot of the scratch data or use it as a crunch
708     job output.
709     """
710
711     class UnsaveableCollection(arvados.collection.Collection):
712         def save(self):
713             pass
714         def save_new(self):
715             pass
716
717     def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None):
718         collection = self.UnsaveableCollection(
719             api_client=api_client,
720             keep_client=api_client.keep,
721             num_retries=num_retries,
722             storage_classes_desired=storage_classes)
723         # This is always enable_write=True because it never tries to
724         # save to the backend
725         super(TmpCollectionDirectory, self).__init__(
726             parent_inode, inodes, True, filters, collection, self)
727         self._poll = False
728         self.populate(self.mtime())
729
730     def collection_record(self):
731         with llfuse.lock_released:
732             return {
733                 "uuid": None,
734                 "manifest_text": self.collection.manifest_text(),
735                 "portable_data_hash": self.collection.portable_data_hash(),
736                 "storage_classes_desired": self.collection.storage_classes_desired(),
737             }
738
739     def __contains__(self, k):
740         return (k == '.arvados#collection' or
741                 super(TmpCollectionDirectory, self).__contains__(k))
742
743     @use_counter
744     def __getitem__(self, item):
745         if item == '.arvados#collection':
746             if self.collection_record_file is None:
747                 self.collection_record_file = FuncToJSONFile(
748                     self.inode, self.collection_record)
749                 self.inodes.add_entry(self.collection_record_file)
750             return self.collection_record_file
751         return super(TmpCollectionDirectory, self).__getitem__(item)
752
753     def persisted(self):
754         return False
755
756     def writable(self):
757         return True
758
759     def flush(self):
760         pass
761
762     def want_event_subscribe(self):
763         return False
764
765     def finalize(self):
766         self.collection.stop_threads()
767
768     def invalidate(self):
769         if self.collection_record_file:
770             self.collection_record_file.invalidate()
771         super(TmpCollectionDirectory, self).invalidate()
772
773
774 class MagicDirectory(Directory):
775     """A special directory that logically contains the set of all extant keep locators.
776
777     When a file is referenced by lookup(), it is tested to see if it is a valid
778     keep locator to a manifest, and if so, loads the manifest contents as a
779     subdirectory of this directory with the locator as the directory name.
780     Since querying a list of all extant keep locators is impractical, only
781     collections that have already been accessed are visible to readdir().
782
783     """
784
785     README_TEXT = """
786 This directory provides access to Arvados collections as subdirectories listed
787 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
788 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
789 (in the form 'zzzzz-j7d0g-1234567890abcde').
790
791 Note that this directory will appear empty until you attempt to access a
792 specific collection or project subdirectory (such as trying to 'cd' into it),
793 at which point the collection or project will actually be looked up on the server
794 and the directory will appear if it exists.
795
796 """.lstrip()
797
798     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
799                  pdh_only=False, storage_classes=None, poll_time=15):
800         super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
801         self.api = api
802         self.num_retries = num_retries
803         self.pdh_only = pdh_only
804         self.storage_classes = storage_classes
805         self._poll = False
806         self._poll_time = poll_time
807
808     def __setattr__(self, name, value):
809         super(MagicDirectory, self).__setattr__(name, value)
810         # When we're assigned an inode, add a README.
811         if ((name == 'inode') and (self.inode is not None) and
812               (not self._entries)):
813             self._entries['README'] = self.inodes.add_entry(
814                 StringFile(self.inode, self.README_TEXT, time.time()))
815             # If we're the root directory, add an identical by_id subdirectory.
816             if self.inode == llfuse.ROOT_INODE:
817                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
818                     self.inode,
819                     self.inodes,
820                     self.api,
821                     self.num_retries,
822                     self._enable_write,
823                     self._filters,
824                     pdh_only=self.pdh_only,
825                     storage_classes=self.storage_classes,
826                     poll_time=self._poll_time
827                 ))
828
829     def __contains__(self, k):
830         if k in self._entries:
831             return True
832
833         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
834             return False
835
836         try:
837             e = None
838
839             if group_uuid_pattern.match(k):
840                 project = self.api.groups().list(
841                     filters=[
842                         ['group_class', 'in', ['project','filter']],
843                         ["uuid", "=", k],
844                         *self._filters_for('groups', qualified=False),
845                     ],
846                 ).execute(num_retries=self.num_retries)
847                 if project[u'items_available'] == 0:
848                     return False
849                 e = self.inodes.add_entry(ProjectDirectory(
850                     self.inode,
851                     self.inodes,
852                     self.api,
853                     self.num_retries,
854                     self._enable_write,
855                     self._filters,
856                     project[u'items'][0],
857                     storage_classes=self.storage_classes,
858                     poll_time=self._poll_time
859                 ))
860             else:
861                 e = self.inodes.add_entry(CollectionDirectory(
862                     self.inode,
863                     self.inodes,
864                     self.api,
865                     self.num_retries,
866                     self._enable_write,
867                     self._filters,
868                     k,
869                     poll_time=self._poll_time
870                 ))
871
872             if e.update():
873                 if k not in self._entries:
874                     self._entries[k] = e
875                 else:
876                     self.inodes.del_entry(e)
877                 return True
878             else:
879                 self.inodes.invalidate_entry(self, k)
880                 self.inodes.del_entry(e)
881                 return False
882         except Exception as ex:
883             _logger.exception("arv-mount lookup '%s':", k)
884             if e is not None:
885                 self.inodes.del_entry(e)
886             return False
887
888     def __getitem__(self, item):
889         if item in self:
890             return self._entries[item]
891         else:
892             raise KeyError("No collection with id " + item)
893
894     def clear(self):
895         pass
896
897     def want_event_subscribe(self):
898         return not self.pdh_only
899
900
901 class TagsDirectory(Directory):
902     """A special directory that contains as subdirectories all tags visible to the user."""
903
904     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
905         super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
906         self.api = api
907         self.num_retries = num_retries
908         self._poll = True
909         self._poll_time = poll_time
910         self._extra = set()
911
912     def want_event_subscribe(self):
913         return True
914
915     @use_counter
916     def update(self):
917         with llfuse.lock_released:
918             tags = self.api.links().list(
919                 filters=[
920                     ['link_class', '=', 'tag'],
921                     ['name', '!=', ''],
922                     *self._filters_for('links', qualified=False),
923                 ],
924                 select=['name'],
925                 distinct=True,
926                 limit=1000,
927             ).execute(num_retries=self.num_retries)
928         if "items" in tags:
929             self.merge(
930                 tags['items']+[{"name": n} for n in self._extra],
931                 lambda i: i['name'],
932                 lambda a, i: a.tag == i['name'],
933                 lambda i: TagDirectory(
934                     self.inode,
935                     self.inodes,
936                     self.api,
937                     self.num_retries,
938                     self._enable_write,
939                     self._filters,
940                     i['name'],
941                     poll=self._poll,
942                     poll_time=self._poll_time,
943                 ),
944             )
945
946     @use_counter
947     @check_update
948     def __getitem__(self, item):
949         if super(TagsDirectory, self).__contains__(item):
950             return super(TagsDirectory, self).__getitem__(item)
951         with llfuse.lock_released:
952             tags = self.api.links().list(
953                 filters=[
954                     ['link_class', '=', 'tag'],
955                     ['name', '=', item],
956                     *self._filters_for('links', qualified=False),
957                 ],
958                 limit=1,
959             ).execute(num_retries=self.num_retries)
960         if tags["items"]:
961             self._extra.add(item)
962             self.update()
963         return super(TagsDirectory, self).__getitem__(item)
964
965     @use_counter
966     @check_update
967     def __contains__(self, k):
968         if super(TagsDirectory, self).__contains__(k):
969             return True
970         try:
971             self[k]
972             return True
973         except KeyError:
974             pass
975         return False
976
977
978 class TagDirectory(Directory):
979     """A special directory that contains as subdirectories all collections visible
980     to the user that are tagged with a particular tag.
981     """
982
983     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
984                  poll=False, poll_time=60):
985         super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
986         self.api = api
987         self.num_retries = num_retries
988         self.tag = tag
989         self._poll = poll
990         self._poll_time = poll_time
991
992     def want_event_subscribe(self):
993         return True
994
995     @use_counter
996     def update(self):
997         with llfuse.lock_released:
998             taggedcollections = self.api.links().list(
999                 filters=[
1000                     ['link_class', '=', 'tag'],
1001                     ['name', '=', self.tag],
1002                     ['head_uuid', 'is_a', 'arvados#collection'],
1003                     *self._filters_for('links', qualified=False),
1004                 ],
1005                 select=['head_uuid'],
1006             ).execute(num_retries=self.num_retries)
1007         self.merge(
1008             taggedcollections['items'],
1009             lambda i: i['head_uuid'],
1010             lambda a, i: a.collection_locator == i['head_uuid'],
1011             lambda i: CollectionDirectory(
1012                 self.inode,
1013                 self.inodes,
1014                 self.api,
1015                 self.num_retries,
1016                 self._enable_write,
1017                 self._filters,
1018                 i['head_uuid'],
1019             ),
1020         )
1021
1022
1023 class ProjectDirectory(Directory):
1024     """A special directory that contains the contents of a project."""
1025
1026     __slots__ = ("api", "num_retries", "project_object", "project_object_file",
1027                  "project_uuid", "_updating_lock",
1028                  "_current_user", "_full_listing", "storage_classes", "recursively_contained")
1029
1030     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1031                  project_object, poll_time=15, storage_classes=None):
1032         super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1033         self.api = api
1034         self.num_retries = num_retries
1035         self.project_object = project_object
1036         self.project_object_file = None
1037         self.project_uuid = project_object['uuid']
1038         self._poll = True
1039         self._poll_time = poll_time
1040         self._updating_lock = threading.Lock()
1041         self._current_user = None
1042         self._full_listing = False
1043         self.storage_classes = storage_classes
1044         self.recursively_contained = False
1045
1046         # Filter groups can contain themselves, which causes tools
1047         # that walk the filesystem to get stuck in an infinite loop,
1048         # so suppress returning a listing in that case.
1049         if self.project_object.get("group_class") == "filter":
1050             iter_parent_inode = parent_inode
1051             while iter_parent_inode != llfuse.ROOT_INODE:
1052                 parent_dir = self.inodes[iter_parent_inode]
1053                 if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid:
1054                     self.recursively_contained = True
1055                     break
1056                 iter_parent_inode = parent_dir.parent_inode
1057
1058     def want_event_subscribe(self):
1059         return True
1060
1061     def createDirectory(self, i):
1062         common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters)
1063         if collection_uuid_pattern.match(i['uuid']):
1064             return CollectionDirectory(*common_args, i, poll_time=self._poll_time)
1065         elif group_uuid_pattern.match(i['uuid']):
1066             return ProjectDirectory(*common_args, i, poll_time=self._poll_time,
1067                                     storage_classes=self.storage_classes)
1068         elif link_uuid_pattern.match(i['uuid']):
1069             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
1070                 return CollectionDirectory(*common_args, i['head_uuid'], poll_time=self._poll_time)
1071             else:
1072                 return None
1073         elif uuid_pattern.match(i['uuid']):
1074             return ObjectFile(self.parent_inode, i)
1075         else:
1076             return None
1077
1078     def uuid(self):
1079         return self.project_uuid
1080
1081     def items(self):
1082         self._full_listing = True
1083         return super(ProjectDirectory, self).items()
1084
1085     def namefn(self, i):
1086         if 'name' in i:
1087             if i['name'] is None or len(i['name']) == 0:
1088                 return None
1089             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
1090                 # collection or subproject
1091                 return i['name']
1092             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
1093                 # name link
1094                 return i['name']
1095             elif 'kind' in i and i['kind'].startswith('arvados#'):
1096                 # something else
1097                 return "{}.{}".format(i['name'], i['kind'][8:])
1098         else:
1099             return None
1100
1101
1102     @use_counter
1103     def update(self):
1104         if self.project_object_file == None:
1105             self.project_object_file = ObjectFile(self.inode, self.project_object)
1106             self.inodes.add_entry(self.project_object_file)
1107
1108         if self.recursively_contained or not self._full_listing:
1109             return True
1110
1111         def samefn(a, i):
1112             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
1113                 return a.uuid() == i['uuid']
1114             elif isinstance(a, ObjectFile):
1115                 return a.uuid() == i['uuid'] and not a.stale()
1116             return False
1117
1118         try:
1119             with llfuse.lock_released:
1120                 self._updating_lock.acquire()
1121                 if not self.stale():
1122                     return
1123
1124                 if group_uuid_pattern.match(self.project_uuid):
1125                     self.project_object = self.api.groups().get(
1126                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
1127                 elif user_uuid_pattern.match(self.project_uuid):
1128                     self.project_object = self.api.users().get(
1129                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
1130                 # do this in 2 steps until #17424 is fixed
1131                 contents = list(arvados.util.keyset_list_all(
1132                     self.api.groups().contents,
1133                     order_key='uuid',
1134                     num_retries=self.num_retries,
1135                     uuid=self.project_uuid,
1136                     filters=[
1137                         ['uuid', 'is_a', 'arvados#group'],
1138                         ['groups.group_class', 'in', ['project', 'filter']],
1139                         *self._filters_for('groups', qualified=True),
1140                     ],
1141                 ))
1142                 contents.extend(obj for obj in arvados.util.keyset_list_all(
1143                     self.api.groups().contents,
1144                     order_key='uuid',
1145                     num_retries=self.num_retries,
1146                     uuid=self.project_uuid,
1147                     filters=[
1148                         ['uuid', 'is_a', 'arvados#collection'],
1149                         *self._filters_for('collections', qualified=True),
1150                     ],
1151                 ) if obj['current_version_uuid'] == obj['uuid'])
1152             # end with llfuse.lock_released, re-acquire lock
1153
1154             self.merge(contents,
1155                        self.namefn,
1156                        samefn,
1157                        self.createDirectory)
1158             return True
1159         finally:
1160             self._updating_lock.release()
1161
1162     def _add_entry(self, i, name):
1163         ent = self.createDirectory(i)
1164         self._entries[name] = self.inodes.add_entry(ent)
1165         return self._entries[name]
1166
1167     @use_counter
1168     @check_update
1169     def __getitem__(self, k):
1170         if k == '.arvados#project':
1171             return self.project_object_file
1172         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1173             return super(ProjectDirectory, self).__getitem__(k)
1174         with llfuse.lock_released:
1175             k2 = self.unsanitize_filename(k)
1176             if k2 == k:
1177                 namefilter = ["name", "=", k]
1178             else:
1179                 namefilter = ["name", "in", [k, k2]]
1180             contents = self.api.groups().list(
1181                 filters=[
1182                     ["owner_uuid", "=", self.project_uuid],
1183                     ["group_class", "in", ["project","filter"]],
1184                     namefilter,
1185                     *self._filters_for('groups', qualified=False),
1186                 ],
1187                 limit=2,
1188             ).execute(num_retries=self.num_retries)["items"]
1189             if not contents:
1190                 contents = self.api.collections().list(
1191                     filters=[
1192                         ["owner_uuid", "=", self.project_uuid],
1193                         namefilter,
1194                         *self._filters_for('collections', qualified=False),
1195                     ],
1196                     limit=2,
1197                 ).execute(num_retries=self.num_retries)["items"]
1198         if contents:
1199             if len(contents) > 1 and contents[1]['name'] == k:
1200                 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1201                 # "foo[SUBST]bar".
1202                 contents = [contents[1]]
1203             name = self.sanitize_filename(self.namefn(contents[0]))
1204             if name != k:
1205                 raise KeyError(k)
1206             return self._add_entry(contents[0], name)
1207
1208         # Didn't find item
1209         raise KeyError(k)
1210
1211     def __contains__(self, k):
1212         if k == '.arvados#project':
1213             return True
1214         try:
1215             self[k]
1216             return True
1217         except KeyError:
1218             pass
1219         return False
1220
1221     @use_counter
1222     @check_update
1223     def writable(self):
1224         if not self._enable_write:
1225             return False
1226         return self.project_object.get("can_write") is True
1227
1228     def persisted(self):
1229         return True
1230
1231     def clear(self):
1232         super(ProjectDirectory, self).clear()
1233         if self.project_object_file is not None:
1234             self.inodes.del_entry(self.project_object_file)
1235         self.project_object_file = None
1236
1237     @use_counter
1238     @check_update
1239     def mkdir(self, name):
1240         if not self.writable():
1241             raise llfuse.FUSEError(errno.EROFS)
1242
1243         try:
1244             with llfuse.lock_released:
1245                 c = {
1246                     "owner_uuid": self.project_uuid,
1247                     "name": name,
1248                     "manifest_text": "" }
1249                 if self.storage_classes is not None:
1250                     c["storage_classes_desired"] = self.storage_classes
1251                 try:
1252                     self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1253                 except Exception as e:
1254                     raise
1255             self.invalidate()
1256         except apiclient_errors.Error as error:
1257             _logger.error(error)
1258             raise llfuse.FUSEError(errno.EEXIST)
1259
1260     @use_counter
1261     @check_update
1262     def rmdir(self, name):
1263         if not self.writable():
1264             raise llfuse.FUSEError(errno.EROFS)
1265
1266         if name not in self:
1267             raise llfuse.FUSEError(errno.ENOENT)
1268         if not isinstance(self[name], CollectionDirectory):
1269             raise llfuse.FUSEError(errno.EPERM)
1270         if len(self[name]) > 0:
1271             raise llfuse.FUSEError(errno.ENOTEMPTY)
1272         with llfuse.lock_released:
1273             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1274         self.invalidate()
1275
1276     @use_counter
1277     @check_update
1278     def rename(self, name_old, name_new, src):
1279         if not self.writable():
1280             raise llfuse.FUSEError(errno.EROFS)
1281
1282         if not isinstance(src, ProjectDirectory):
1283             raise llfuse.FUSEError(errno.EPERM)
1284
1285         ent = src[name_old]
1286
1287         if not isinstance(ent, CollectionDirectory):
1288             raise llfuse.FUSEError(errno.EPERM)
1289
1290         if name_new in self:
1291             # POSIX semantics for replacing one directory with another is
1292             # tricky (the target directory must be empty, the operation must be
1293             # atomic which isn't possible with the Arvados API as of this
1294             # writing) so don't support that.
1295             raise llfuse.FUSEError(errno.EPERM)
1296
1297         self.api.collections().update(uuid=ent.uuid(),
1298                                       body={"owner_uuid": self.uuid(),
1299                                             "name": name_new}).execute(num_retries=self.num_retries)
1300
1301         # Acually move the entry from source directory to this directory.
1302         del src._entries[name_old]
1303         self._entries[name_new] = ent
1304         self.inodes.invalidate_entry(src, name_old)
1305
1306     @use_counter
1307     def child_event(self, ev):
1308         properties = ev.get("properties") or {}
1309         old_attrs = properties.get("old_attributes") or {}
1310         new_attrs = properties.get("new_attributes") or {}
1311         old_attrs["uuid"] = ev["object_uuid"]
1312         new_attrs["uuid"] = ev["object_uuid"]
1313         old_name = self.sanitize_filename(self.namefn(old_attrs))
1314         new_name = self.sanitize_filename(self.namefn(new_attrs))
1315
1316         # create events will have a new name, but not an old name
1317         # delete events will have an old name, but not a new name
1318         # update events will have an old and new name, and they may be same or different
1319         # if they are the same, an unrelated field changed and there is nothing to do.
1320
1321         if old_attrs.get("owner_uuid") != self.project_uuid:
1322             # Was moved from somewhere else, so don't try to remove entry.
1323             old_name = None
1324         if ev.get("object_owner_uuid") != self.project_uuid:
1325             # Was moved to somewhere else, so don't try to add entry
1326             new_name = None
1327
1328         if old_attrs.get("is_trashed"):
1329             # Was previously deleted
1330             old_name = None
1331         if new_attrs.get("is_trashed"):
1332             # Has been deleted
1333             new_name = None
1334
1335         if new_name != old_name:
1336             ent = None
1337             if old_name in self._entries:
1338                 ent = self._entries[old_name]
1339                 del self._entries[old_name]
1340                 self.inodes.invalidate_entry(self, old_name)
1341
1342             if new_name:
1343                 if ent is not None:
1344                     self._entries[new_name] = ent
1345                 else:
1346                     self._add_entry(new_attrs, new_name)
1347             elif ent is not None:
1348                 self.inodes.del_entry(ent)
1349
1350
1351 class SharedDirectory(Directory):
1352     """A special directory that represents users or groups who have shared projects with me."""
1353
1354     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1355                  exclude, poll_time=60, storage_classes=None):
1356         super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1357         self.api = api
1358         self.num_retries = num_retries
1359         self.current_user = api.users().current().execute(num_retries=num_retries)
1360         self._poll = True
1361         self._poll_time = poll_time
1362         self._updating_lock = threading.Lock()
1363         self.storage_classes = storage_classes
1364
1365     @use_counter
1366     def update(self):
1367         try:
1368             with llfuse.lock_released:
1369                 self._updating_lock.acquire()
1370                 if not self.stale():
1371                     return
1372
1373                 contents = {}
1374                 roots = []
1375                 root_owners = set()
1376                 objects = {}
1377
1378                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1379                 if 'httpMethod' in methods.get('shared', {}):
1380                     page = []
1381                     while True:
1382                         resp = self.api.groups().shared(
1383                             filters=[
1384                                 ['group_class', 'in', ['project','filter']],
1385                                 *page,
1386                                 *self._filters_for('groups', qualified=False),
1387                             ],
1388                             order="uuid",
1389                             limit=10000,
1390                             count="none",
1391                             include="owner_uuid",
1392                         ).execute()
1393                         if not resp["items"]:
1394                             break
1395                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1396                         for r in resp["items"]:
1397                             objects[r["uuid"]] = r
1398                             roots.append(r["uuid"])
1399                         for r in resp["included"]:
1400                             objects[r["uuid"]] = r
1401                             root_owners.add(r["uuid"])
1402                 else:
1403                     all_projects = list(arvados.util.keyset_list_all(
1404                         self.api.groups().list,
1405                         order_key="uuid",
1406                         num_retries=self.num_retries,
1407                         filters=[
1408                             ['group_class', 'in', ['project','filter']],
1409                             *self._filters_for('groups', qualified=False),
1410                         ],
1411                         select=["uuid", "owner_uuid"],
1412                     ))
1413                     for ob in all_projects:
1414                         objects[ob['uuid']] = ob
1415
1416                     current_uuid = self.current_user['uuid']
1417                     for ob in all_projects:
1418                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1419                             roots.append(ob['uuid'])
1420                             root_owners.add(ob['owner_uuid'])
1421
1422                     lusers = arvados.util.keyset_list_all(
1423                         self.api.users().list,
1424                         order_key="uuid",
1425                         num_retries=self.num_retries,
1426                         filters=[
1427                             ['uuid', 'in', list(root_owners)],
1428                             *self._filters_for('users', qualified=False),
1429                         ],
1430                     )
1431                     lgroups = arvados.util.keyset_list_all(
1432                         self.api.groups().list,
1433                         order_key="uuid",
1434                         num_retries=self.num_retries,
1435                         filters=[
1436                             ['uuid', 'in', list(root_owners)+roots],
1437                             *self._filters_for('groups', qualified=False),
1438                         ],
1439                     )
1440                     for l in lusers:
1441                         objects[l["uuid"]] = l
1442                     for l in lgroups:
1443                         objects[l["uuid"]] = l
1444
1445                 for r in root_owners:
1446                     if r in objects:
1447                         obr = objects[r]
1448                         if obr.get("name"):
1449                             contents[obr["name"]] = obr
1450                         elif "first_name" in obr:
1451                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1452
1453                 for r in roots:
1454                     if r in objects:
1455                         obr = objects[r]
1456                         if obr['owner_uuid'] not in objects:
1457                             contents[obr["name"]] = obr
1458
1459             # end with llfuse.lock_released, re-acquire lock
1460
1461             self.merge(
1462                 contents.items(),
1463                 lambda i: i[0],
1464                 lambda a, i: a.uuid() == i[1]['uuid'],
1465                 lambda i: ProjectDirectory(
1466                     self.inode,
1467                     self.inodes,
1468                     self.api,
1469                     self.num_retries,
1470                     self._enable_write,
1471                     self._filters,
1472                     i[1],
1473                     poll_time=self._poll_time,
1474                     storage_classes=self.storage_classes,
1475                 ),
1476             )
1477         except Exception:
1478             _logger.exception("arv-mount shared dir error")
1479         finally:
1480             self._updating_lock.release()
1481
1482     def want_event_subscribe(self):
1483         return True