17417: Merge branch 'main' into 17417-add-arm64
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import apiclient
6 import arvados
7 import errno
8 import functools
9 import llfuse
10 import logging
11 import re
12 import sys
13 import threading
14 import time
15 from apiclient import errors as apiclient_errors
16
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
19
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
22
23 _logger = logging.getLogger('arvados.arvados_fuse')
24
25
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile('[\x00/]')
30
31
32 class Directory(FreshBase):
33     """Generic directory object, backed by a dict.
34
35     Consists of a set of entries with the key representing the filename
36     and the value referencing a File or Directory object.
37     """
38
39     def __init__(self, parent_inode, inodes, apiconfig, enable_write):
40         """parent_inode is the integer inode number"""
41
42         super(Directory, self).__init__()
43
44         self.inode = None
45         if not isinstance(parent_inode, int):
46             raise Exception("parent_inode should be an int")
47         self.parent_inode = parent_inode
48         self.inodes = inodes
49         self.apiconfig = apiconfig
50         self._entries = {}
51         self._mtime = time.time()
52         self._enable_write = enable_write
53
54     def forward_slash_subst(self):
55         if not hasattr(self, '_fsns'):
56             self._fsns = None
57             config = self.apiconfig()
58             try:
59                 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
60             except KeyError:
61                 # old API server with no FSNS config
62                 self._fsns = '_'
63             else:
64                 if self._fsns == '' or self._fsns == '/':
65                     self._fsns = None
66         return self._fsns
67
68     def unsanitize_filename(self, incoming):
69         """Replace ForwardSlashNameSubstitution value with /"""
70         fsns = self.forward_slash_subst()
71         if isinstance(fsns, str):
72             return incoming.replace(fsns, '/')
73         else:
74             return incoming
75
76     def sanitize_filename(self, dirty):
77         """Replace disallowed filename characters according to
78         ForwardSlashNameSubstitution in self.api_config."""
79         # '.' and '..' are not reachable if API server is newer than #6277
80         if dirty is None:
81             return None
82         elif dirty == '':
83             return '_'
84         elif dirty == '.':
85             return '_'
86         elif dirty == '..':
87             return '__'
88         else:
89             fsns = self.forward_slash_subst()
90             if isinstance(fsns, str):
91                 dirty = dirty.replace('/', fsns)
92             return _disallowed_filename_characters.sub('_', dirty)
93
94
95     #  Overridden by subclasses to implement logic to update the
96     #  entries dict when the directory is stale
97     @use_counter
98     def update(self):
99         pass
100
101     # Only used when computing the size of the disk footprint of the directory
102     # (stub)
103     def size(self):
104         return 0
105
106     def persisted(self):
107         return False
108
109     def checkupdate(self):
110         if self.stale():
111             try:
112                 self.update()
113             except apiclient.errors.HttpError as e:
114                 _logger.warn(e)
115
116     @use_counter
117     @check_update
118     def __getitem__(self, item):
119         return self._entries[item]
120
121     @use_counter
122     @check_update
123     def items(self):
124         return list(self._entries.items())
125
126     @use_counter
127     @check_update
128     def __contains__(self, k):
129         return k in self._entries
130
131     @use_counter
132     @check_update
133     def __len__(self):
134         return len(self._entries)
135
136     def fresh(self):
137         self.inodes.touch(self)
138         super(Directory, self).fresh()
139
140     def merge(self, items, fn, same, new_entry):
141         """Helper method for updating the contents of the directory.
142
143         Takes a list describing the new contents of the directory, reuse
144         entries that are the same in both the old and new lists, create new
145         entries, and delete old entries missing from the new list.
146
147         :items: iterable with new directory contents
148
149         :fn: function to take an entry in 'items' and return the desired file or
150         directory name, or None if this entry should be skipped
151
152         :same: function to compare an existing entry (a File or Directory
153         object) with an entry in the items list to determine whether to keep
154         the existing entry.
155
156         :new_entry: function to create a new directory entry (File or Directory
157         object) from an entry in the items list.
158
159         """
160
161         oldentries = self._entries
162         self._entries = {}
163         changed = False
164         for i in items:
165             name = self.sanitize_filename(fn(i))
166             if name:
167                 if name in oldentries and same(oldentries[name], i):
168                     # move existing directory entry over
169                     self._entries[name] = oldentries[name]
170                     del oldentries[name]
171                 else:
172                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
173                     # create new directory entry
174                     ent = new_entry(i)
175                     if ent is not None:
176                         self._entries[name] = self.inodes.add_entry(ent)
177                         changed = True
178
179         # delete any other directory entries that were not in found in 'items'
180         for i in oldentries:
181             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
182             self.inodes.invalidate_entry(self, i)
183             self.inodes.del_entry(oldentries[i])
184             changed = True
185
186         if changed:
187             self.inodes.invalidate_inode(self)
188             self._mtime = time.time()
189
190         self.fresh()
191
192     def in_use(self):
193         if super(Directory, self).in_use():
194             return True
195         for v in self._entries.values():
196             if v.in_use():
197                 return True
198         return False
199
200     def has_ref(self, only_children):
201         if super(Directory, self).has_ref(only_children):
202             return True
203         for v in self._entries.values():
204             if v.has_ref(False):
205                 return True
206         return False
207
208     def clear(self):
209         """Delete all entries"""
210         oldentries = self._entries
211         self._entries = {}
212         for n in oldentries:
213             oldentries[n].clear()
214             self.inodes.del_entry(oldentries[n])
215         self.invalidate()
216
217     def kernel_invalidate(self):
218         # Invalidating the dentry on the parent implies invalidating all paths
219         # below it as well.
220         parent = self.inodes[self.parent_inode]
221
222         # Find self on the parent in order to invalidate this path.
223         # Calling the public items() method might trigger a refresh,
224         # which we definitely don't want, so read the internal dict directly.
225         for k,v in parent._entries.items():
226             if v is self:
227                 self.inodes.invalidate_entry(parent, k)
228                 break
229
230     def mtime(self):
231         return self._mtime
232
233     def writable(self):
234         return False
235
236     def flush(self):
237         pass
238
239     def want_event_subscribe(self):
240         raise NotImplementedError()
241
242     def create(self, name):
243         raise NotImplementedError()
244
245     def mkdir(self, name):
246         raise NotImplementedError()
247
248     def unlink(self, name):
249         raise NotImplementedError()
250
251     def rmdir(self, name):
252         raise NotImplementedError()
253
254     def rename(self, name_old, name_new, src):
255         raise NotImplementedError()
256
257
258 class CollectionDirectoryBase(Directory):
259     """Represent an Arvados Collection as a directory.
260
261     This class is used for Subcollections, and is also the base class for
262     CollectionDirectory, which implements collection loading/saving on
263     Collection records.
264
265     Most operations act only the underlying Arvados `Collection` object.  The
266     `Collection` object signals via a notify callback to
267     `CollectionDirectoryBase.on_event` that an item was added, removed or
268     modified.  FUSE inodes and directory entries are created, deleted or
269     invalidated in response to these events.
270
271     """
272
273     def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection):
274         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write)
275         self.apiconfig = apiconfig
276         self.collection = collection
277
278     def new_entry(self, name, item, mtime):
279         name = self.sanitize_filename(name)
280         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
281             if item.fuse_entry.dead is not True:
282                 raise Exception("Can only reparent dead inode entry")
283             if item.fuse_entry.inode is None:
284                 raise Exception("Reparented entry must still have valid inode")
285             item.fuse_entry.dead = False
286             self._entries[name] = item.fuse_entry
287         elif isinstance(item, arvados.collection.RichCollectionBase):
288             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item))
289             self._entries[name].populate(mtime)
290         else:
291             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
292         item.fuse_entry = self._entries[name]
293
294     def on_event(self, event, collection, name, item):
295         if collection == self.collection:
296             name = self.sanitize_filename(name)
297
298             #
299             # It's possible for another thread to have llfuse.lock and
300             # be waiting on collection.lock.  Meanwhile, we released
301             # llfuse.lock earlier in the stack, but are still holding
302             # on to the collection lock, and now we need to re-acquire
303             # llfuse.lock.  If we don't release the collection lock,
304             # we'll deadlock where we're holding the collection lock
305             # waiting for llfuse.lock and the other thread is holding
306             # llfuse.lock and waiting for the collection lock.
307             #
308             # The correct locking order here is to take llfuse.lock
309             # first, then the collection lock.
310             #
311             # Since collection.lock is an RLock, it might be locked
312             # multiple times, so we need to release it multiple times,
313             # keep a count, then re-lock it the correct number of
314             # times.
315             #
316             lockcount = 0
317             try:
318                 while True:
319                     self.collection.lock.release()
320                     lockcount += 1
321             except RuntimeError:
322                 pass
323
324             try:
325                 with llfuse.lock:
326                     with self.collection.lock:
327                         if event == arvados.collection.ADD:
328                             self.new_entry(name, item, self.mtime())
329                         elif event == arvados.collection.DEL:
330                             ent = self._entries[name]
331                             del self._entries[name]
332                             self.inodes.invalidate_entry(self, name)
333                             self.inodes.del_entry(ent)
334                         elif event == arvados.collection.MOD:
335                             if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
336                                 self.inodes.invalidate_inode(item.fuse_entry)
337                             elif name in self._entries:
338                                 self.inodes.invalidate_inode(self._entries[name])
339             finally:
340                 while lockcount > 0:
341                     self.collection.lock.acquire()
342                     lockcount -= 1
343
344     def populate(self, mtime):
345         self._mtime = mtime
346         with self.collection.lock:
347             self.collection.subscribe(self.on_event)
348             for entry, item in self.collection.items():
349                 self.new_entry(entry, item, self.mtime())
350
351     def writable(self):
352         return self._enable_write and self.collection.writable()
353
354     @use_counter
355     def flush(self):
356         if not self.writable():
357             return
358         with llfuse.lock_released:
359             self.collection.root_collection().save()
360
361     @use_counter
362     @check_update
363     def create(self, name):
364         if not self.writable():
365             raise llfuse.FUSEError(errno.EROFS)
366         with llfuse.lock_released:
367             self.collection.open(name, "w").close()
368
369     @use_counter
370     @check_update
371     def mkdir(self, name):
372         if not self.writable():
373             raise llfuse.FUSEError(errno.EROFS)
374         with llfuse.lock_released:
375             self.collection.mkdirs(name)
376
377     @use_counter
378     @check_update
379     def unlink(self, name):
380         if not self.writable():
381             raise llfuse.FUSEError(errno.EROFS)
382         with llfuse.lock_released:
383             self.collection.remove(name)
384         self.flush()
385
386     @use_counter
387     @check_update
388     def rmdir(self, name):
389         if not self.writable():
390             raise llfuse.FUSEError(errno.EROFS)
391         with llfuse.lock_released:
392             self.collection.remove(name)
393         self.flush()
394
395     @use_counter
396     @check_update
397     def rename(self, name_old, name_new, src):
398         if not self.writable():
399             raise llfuse.FUSEError(errno.EROFS)
400
401         if not isinstance(src, CollectionDirectoryBase):
402             raise llfuse.FUSEError(errno.EPERM)
403
404         if name_new in self:
405             ent = src[name_old]
406             tgt = self[name_new]
407             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
408                 pass
409             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
410                 if len(tgt) > 0:
411                     raise llfuse.FUSEError(errno.ENOTEMPTY)
412             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
413                 raise llfuse.FUSEError(errno.ENOTDIR)
414             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
415                 raise llfuse.FUSEError(errno.EISDIR)
416
417         with llfuse.lock_released:
418             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
419         self.flush()
420         src.flush()
421
422     def clear(self):
423         super(CollectionDirectoryBase, self).clear()
424         self.collection = None
425
426
427 class CollectionDirectory(CollectionDirectoryBase):
428     """Represents the root of a directory tree representing a collection."""
429
430     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, collection_record=None, explicit_collection=None):
431         super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None)
432         self.api = api
433         self.num_retries = num_retries
434         self.collection_record_file = None
435         self.collection_record = None
436         self._poll = True
437         try:
438             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
439         except:
440             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
441             self._poll_time = 60*60
442
443         if isinstance(collection_record, dict):
444             self.collection_locator = collection_record['uuid']
445             self._mtime = convertTime(collection_record.get('modified_at'))
446         else:
447             self.collection_locator = collection_record
448             self._mtime = 0
449         self._manifest_size = 0
450         if self.collection_locator:
451             self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
452         self._updating_lock = threading.Lock()
453
454     def same(self, i):
455         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
456
457     def writable(self):
458         return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
459
460     def want_event_subscribe(self):
461         return (uuid_pattern.match(self.collection_locator) is not None)
462
463     # Used by arv-web.py to switch the contents of the CollectionDirectory
464     def change_collection(self, new_locator):
465         """Switch the contents of the CollectionDirectory.
466
467         Must be called with llfuse.lock held.
468         """
469
470         self.collection_locator = new_locator
471         self.collection_record = None
472         self.update()
473
474     def new_collection(self, new_collection_record, coll_reader):
475         if self.inode:
476             self.clear()
477
478         self.collection_record = new_collection_record
479
480         if self.collection_record:
481             self._mtime = convertTime(self.collection_record.get('modified_at'))
482             self.collection_locator = self.collection_record["uuid"]
483             if self.collection_record_file is not None:
484                 self.collection_record_file.update(self.collection_record)
485
486         self.collection = coll_reader
487         self.populate(self.mtime())
488
489     def uuid(self):
490         return self.collection_locator
491
492     @use_counter
493     def update(self, to_record_version=None):
494         try:
495             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
496                 return True
497
498             if self.collection_locator is None:
499                 self.fresh()
500                 return True
501
502             try:
503                 with llfuse.lock_released:
504                     self._updating_lock.acquire()
505                     if not self.stale():
506                         return
507
508                     _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
509                     new_collection_record = None
510                     if self.collection is not None:
511                         if self.collection.known_past_version(to_record_version):
512                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
513                         else:
514                             self.collection.update()
515                     else:
516                         if uuid_pattern.match(self.collection_locator):
517                             coll_reader = arvados.collection.Collection(
518                                 self.collection_locator, self.api, self.api.keep,
519                                 num_retries=self.num_retries)
520                         else:
521                             coll_reader = arvados.collection.CollectionReader(
522                                 self.collection_locator, self.api, self.api.keep,
523                                 num_retries=self.num_retries)
524                         new_collection_record = coll_reader.api_response() or {}
525                         # If the Collection only exists in Keep, there will be no API
526                         # response.  Fill in the fields we need.
527                         if 'uuid' not in new_collection_record:
528                             new_collection_record['uuid'] = self.collection_locator
529                         if "portable_data_hash" not in new_collection_record:
530                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
531                         if 'manifest_text' not in new_collection_record:
532                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
533                         if 'storage_classes_desired' not in new_collection_record:
534                             new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
535
536                 # end with llfuse.lock_released, re-acquire lock
537                 if (new_collection_record is not None and
538                     (self.collection_record is None or
539                      self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"))):
540                     self.new_collection(new_collection_record, coll_reader)
541                     self._manifest_size = len(coll_reader.manifest_text())
542                     _logger.debug("%s manifest_size %i", self, self._manifest_size)
543
544                 self.fresh()
545                 return True
546             finally:
547                 self._updating_lock.release()
548         except arvados.errors.NotFoundError as e:
549             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
550         except arvados.errors.ArgumentError as detail:
551             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
552             if self.collection_record is not None and "manifest_text" in self.collection_record:
553                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
554         except Exception:
555             _logger.exception("arv-mount %s: error", self.collection_locator)
556             if self.collection_record is not None and "manifest_text" in self.collection_record:
557                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
558         self.invalidate()
559         return False
560
561     @use_counter
562     @check_update
563     def __getitem__(self, item):
564         if item == '.arvados#collection':
565             if self.collection_record_file is None:
566                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
567                 self.inodes.add_entry(self.collection_record_file)
568             return self.collection_record_file
569         else:
570             return super(CollectionDirectory, self).__getitem__(item)
571
572     def __contains__(self, k):
573         if k == '.arvados#collection':
574             return True
575         else:
576             return super(CollectionDirectory, self).__contains__(k)
577
578     def invalidate(self):
579         self.collection_record = None
580         self.collection_record_file = None
581         super(CollectionDirectory, self).invalidate()
582
583     def persisted(self):
584         return (self.collection_locator is not None)
585
586     def objsize(self):
587         # This is an empirically-derived heuristic to estimate the memory used
588         # to store this collection's metadata.  Calculating the memory
589         # footprint directly would be more accurate, but also more complicated.
590         return self._manifest_size * 128
591
592     def finalize(self):
593         if self.collection is not None:
594             if self.writable():
595                 self.collection.save()
596             self.collection.stop_threads()
597
598     def clear(self):
599         if self.collection is not None:
600             self.collection.stop_threads()
601         super(CollectionDirectory, self).clear()
602         self._manifest_size = 0
603
604
605 class TmpCollectionDirectory(CollectionDirectoryBase):
606     """A directory backed by an Arvados collection that never gets saved.
607
608     This supports using Keep as scratch space. A userspace program can
609     read the .arvados#collection file to get a current manifest in
610     order to save a snapshot of the scratch data or use it as a crunch
611     job output.
612     """
613
614     class UnsaveableCollection(arvados.collection.Collection):
615         def save(self):
616             pass
617         def save_new(self):
618             pass
619
620     def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, storage_classes=None):
621         collection = self.UnsaveableCollection(
622             api_client=api_client,
623             keep_client=api_client.keep,
624             num_retries=num_retries,
625             storage_classes_desired=storage_classes)
626         # This is always enable_write=True because it never tries to
627         # save to the backend
628         super(TmpCollectionDirectory, self).__init__(
629             parent_inode, inodes, api_client.config, True, collection)
630         self.collection_record_file = None
631         self.populate(self.mtime())
632
633     def on_event(self, *args, **kwargs):
634         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
635         if self.collection_record_file:
636
637             # See discussion in CollectionDirectoryBase.on_event
638             lockcount = 0
639             try:
640                 while True:
641                     self.collection.lock.release()
642                     lockcount += 1
643             except RuntimeError:
644                 pass
645
646             try:
647                 with llfuse.lock:
648                     with self.collection.lock:
649                         self.collection_record_file.invalidate()
650                         self.inodes.invalidate_inode(self.collection_record_file)
651                         _logger.debug("%s invalidated collection record", self)
652             finally:
653                 while lockcount > 0:
654                     self.collection.lock.acquire()
655                     lockcount -= 1
656
657     def collection_record(self):
658         with llfuse.lock_released:
659             return {
660                 "uuid": None,
661                 "manifest_text": self.collection.manifest_text(),
662                 "portable_data_hash": self.collection.portable_data_hash(),
663                 "storage_classes_desired": self.collection.storage_classes_desired(),
664             }
665
666     def __contains__(self, k):
667         return (k == '.arvados#collection' or
668                 super(TmpCollectionDirectory, self).__contains__(k))
669
670     @use_counter
671     def __getitem__(self, item):
672         if item == '.arvados#collection':
673             if self.collection_record_file is None:
674                 self.collection_record_file = FuncToJSONFile(
675                     self.inode, self.collection_record)
676                 self.inodes.add_entry(self.collection_record_file)
677             return self.collection_record_file
678         return super(TmpCollectionDirectory, self).__getitem__(item)
679
680     def persisted(self):
681         return False
682
683     def writable(self):
684         return True
685
686     def want_event_subscribe(self):
687         return False
688
689     def finalize(self):
690         self.collection.stop_threads()
691
692     def invalidate(self):
693         if self.collection_record_file:
694             self.collection_record_file.invalidate()
695         super(TmpCollectionDirectory, self).invalidate()
696
697
698 class MagicDirectory(Directory):
699     """A special directory that logically contains the set of all extant keep locators.
700
701     When a file is referenced by lookup(), it is tested to see if it is a valid
702     keep locator to a manifest, and if so, loads the manifest contents as a
703     subdirectory of this directory with the locator as the directory name.
704     Since querying a list of all extant keep locators is impractical, only
705     collections that have already been accessed are visible to readdir().
706
707     """
708
709     README_TEXT = """
710 This directory provides access to Arvados collections as subdirectories listed
711 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
712 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
713 (in the form 'zzzzz-j7d0g-1234567890abcde').
714
715 Note that this directory will appear empty until you attempt to access a
716 specific collection or project subdirectory (such as trying to 'cd' into it),
717 at which point the collection or project will actually be looked up on the server
718 and the directory will appear if it exists.
719
720 """.lstrip()
721
722     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, pdh_only=False, storage_classes=None):
723         super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
724         self.api = api
725         self.num_retries = num_retries
726         self.pdh_only = pdh_only
727         self.storage_classes = storage_classes
728
729     def __setattr__(self, name, value):
730         super(MagicDirectory, self).__setattr__(name, value)
731         # When we're assigned an inode, add a README.
732         if ((name == 'inode') and (self.inode is not None) and
733               (not self._entries)):
734             self._entries['README'] = self.inodes.add_entry(
735                 StringFile(self.inode, self.README_TEXT, time.time()))
736             # If we're the root directory, add an identical by_id subdirectory.
737             if self.inode == llfuse.ROOT_INODE:
738                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
739                     self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
740                     self.pdh_only))
741
742     def __contains__(self, k):
743         if k in self._entries:
744             return True
745
746         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
747             return False
748
749         try:
750             e = None
751
752             if group_uuid_pattern.match(k):
753                 project = self.api.groups().list(
754                     filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
755                 if project[u'items_available'] == 0:
756                     return False
757                 e = self.inodes.add_entry(ProjectDirectory(
758                     self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
759                     project[u'items'][0], storage_classes=self.storage_classes))
760             else:
761                 e = self.inodes.add_entry(CollectionDirectory(
762                         self.inode, self.inodes, self.api, self.num_retries, self._enable_write, k))
763
764             if e.update():
765                 if k not in self._entries:
766                     self._entries[k] = e
767                 else:
768                     self.inodes.del_entry(e)
769                 return True
770             else:
771                 self.inodes.invalidate_entry(self, k)
772                 self.inodes.del_entry(e)
773                 return False
774         except Exception as ex:
775             _logger.exception("arv-mount lookup '%s':", k)
776             if e is not None:
777                 self.inodes.del_entry(e)
778             return False
779
780     def __getitem__(self, item):
781         if item in self:
782             return self._entries[item]
783         else:
784             raise KeyError("No collection with id " + item)
785
786     def clear(self):
787         pass
788
789     def want_event_subscribe(self):
790         return not self.pdh_only
791
792
793 class TagsDirectory(Directory):
794     """A special directory that contains as subdirectories all tags visible to the user."""
795
796     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, poll_time=60):
797         super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
798         self.api = api
799         self.num_retries = num_retries
800         self._poll = True
801         self._poll_time = poll_time
802         self._extra = set()
803
804     def want_event_subscribe(self):
805         return True
806
807     @use_counter
808     def update(self):
809         with llfuse.lock_released:
810             tags = self.api.links().list(
811                 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
812                 select=['name'], distinct=True, limit=1000
813                 ).execute(num_retries=self.num_retries)
814         if "items" in tags:
815             self.merge(tags['items']+[{"name": n} for n in self._extra],
816                        lambda i: i['name'],
817                        lambda a, i: a.tag == i['name'],
818                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
819                                               i['name'], poll=self._poll, poll_time=self._poll_time))
820
821     @use_counter
822     @check_update
823     def __getitem__(self, item):
824         if super(TagsDirectory, self).__contains__(item):
825             return super(TagsDirectory, self).__getitem__(item)
826         with llfuse.lock_released:
827             tags = self.api.links().list(
828                 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
829             ).execute(num_retries=self.num_retries)
830         if tags["items"]:
831             self._extra.add(item)
832             self.update()
833         return super(TagsDirectory, self).__getitem__(item)
834
835     @use_counter
836     @check_update
837     def __contains__(self, k):
838         if super(TagsDirectory, self).__contains__(k):
839             return True
840         try:
841             self[k]
842             return True
843         except KeyError:
844             pass
845         return False
846
847
848 class TagDirectory(Directory):
849     """A special directory that contains as subdirectories all collections visible
850     to the user that are tagged with a particular tag.
851     """
852
853     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, tag,
854                  poll=False, poll_time=60):
855         super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
856         self.api = api
857         self.num_retries = num_retries
858         self.tag = tag
859         self._poll = poll
860         self._poll_time = poll_time
861
862     def want_event_subscribe(self):
863         return True
864
865     @use_counter
866     def update(self):
867         with llfuse.lock_released:
868             taggedcollections = self.api.links().list(
869                 filters=[['link_class', '=', 'tag'],
870                          ['name', '=', self.tag],
871                          ['head_uuid', 'is_a', 'arvados#collection']],
872                 select=['head_uuid']
873                 ).execute(num_retries=self.num_retries)
874         self.merge(taggedcollections['items'],
875                    lambda i: i['head_uuid'],
876                    lambda a, i: a.collection_locator == i['head_uuid'],
877                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid']))
878
879
880 class ProjectDirectory(Directory):
881     """A special directory that contains the contents of a project."""
882
883     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, project_object,
884                  poll=True, poll_time=3, storage_classes=None):
885         super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
886         self.api = api
887         self.num_retries = num_retries
888         self.project_object = project_object
889         self.project_object_file = None
890         self.project_uuid = project_object['uuid']
891         self._poll = poll
892         self._poll_time = poll_time
893         self._updating_lock = threading.Lock()
894         self._current_user = None
895         self._full_listing = False
896         self.storage_classes = storage_classes
897
898     def want_event_subscribe(self):
899         return True
900
901     def createDirectory(self, i):
902         if collection_uuid_pattern.match(i['uuid']):
903             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i)
904         elif group_uuid_pattern.match(i['uuid']):
905             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
906                                     i, self._poll, self._poll_time, self.storage_classes)
907         elif link_uuid_pattern.match(i['uuid']):
908             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
909                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid'])
910             else:
911                 return None
912         elif uuid_pattern.match(i['uuid']):
913             return ObjectFile(self.parent_inode, i)
914         else:
915             return None
916
917     def uuid(self):
918         return self.project_uuid
919
920     def items(self):
921         self._full_listing = True
922         return super(ProjectDirectory, self).items()
923
924     def namefn(self, i):
925         if 'name' in i:
926             if i['name'] is None or len(i['name']) == 0:
927                 return None
928             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
929                 # collection or subproject
930                 return i['name']
931             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
932                 # name link
933                 return i['name']
934             elif 'kind' in i and i['kind'].startswith('arvados#'):
935                 # something else
936                 return "{}.{}".format(i['name'], i['kind'][8:])
937         else:
938             return None
939
940
941     @use_counter
942     def update(self):
943         if self.project_object_file == None:
944             self.project_object_file = ObjectFile(self.inode, self.project_object)
945             self.inodes.add_entry(self.project_object_file)
946
947         if not self._full_listing:
948             return True
949
950         def samefn(a, i):
951             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
952                 return a.uuid() == i['uuid']
953             elif isinstance(a, ObjectFile):
954                 return a.uuid() == i['uuid'] and not a.stale()
955             return False
956
957         try:
958             with llfuse.lock_released:
959                 self._updating_lock.acquire()
960                 if not self.stale():
961                     return
962
963                 if group_uuid_pattern.match(self.project_uuid):
964                     self.project_object = self.api.groups().get(
965                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
966                 elif user_uuid_pattern.match(self.project_uuid):
967                     self.project_object = self.api.users().get(
968                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
969                 # do this in 2 steps until #17424 is fixed
970                 contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
971                                                         order_key="uuid",
972                                                         num_retries=self.num_retries,
973                                                         uuid=self.project_uuid,
974                                                         filters=[["uuid", "is_a", "arvados#group"],
975                                                                  ["groups.group_class", "in", ["project","filter"]]]))
976                 contents.extend(arvados.util.keyset_list_all(self.api.groups().contents,
977                                                              order_key="uuid",
978                                                              num_retries=self.num_retries,
979                                                              uuid=self.project_uuid,
980                                                              filters=[["uuid", "is_a", "arvados#collection"]]))
981
982             # end with llfuse.lock_released, re-acquire lock
983
984             self.merge(contents,
985                        self.namefn,
986                        samefn,
987                        self.createDirectory)
988             return True
989         finally:
990             self._updating_lock.release()
991
992     def _add_entry(self, i, name):
993         ent = self.createDirectory(i)
994         self._entries[name] = self.inodes.add_entry(ent)
995         return self._entries[name]
996
997     @use_counter
998     @check_update
999     def __getitem__(self, k):
1000         if k == '.arvados#project':
1001             return self.project_object_file
1002         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1003             return super(ProjectDirectory, self).__getitem__(k)
1004         with llfuse.lock_released:
1005             k2 = self.unsanitize_filename(k)
1006             if k2 == k:
1007                 namefilter = ["name", "=", k]
1008             else:
1009                 namefilter = ["name", "in", [k, k2]]
1010             contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
1011                                                        ["group_class", "in", ["project","filter"]],
1012                                                        namefilter],
1013                                               limit=2).execute(num_retries=self.num_retries)["items"]
1014             if not contents:
1015                 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
1016                                                                 namefilter],
1017                                                        limit=2).execute(num_retries=self.num_retries)["items"]
1018         if contents:
1019             if len(contents) > 1 and contents[1]['name'] == k:
1020                 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1021                 # "foo[SUBST]bar".
1022                 contents = [contents[1]]
1023             name = self.sanitize_filename(self.namefn(contents[0]))
1024             if name != k:
1025                 raise KeyError(k)
1026             return self._add_entry(contents[0], name)
1027
1028         # Didn't find item
1029         raise KeyError(k)
1030
1031     def __contains__(self, k):
1032         if k == '.arvados#project':
1033             return True
1034         try:
1035             self[k]
1036             return True
1037         except KeyError:
1038             pass
1039         return False
1040
1041     @use_counter
1042     @check_update
1043     def writable(self):
1044         if not self._enable_write:
1045             return False
1046         with llfuse.lock_released:
1047             if not self._current_user:
1048                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1049             return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1050
1051     def persisted(self):
1052         return True
1053
1054     @use_counter
1055     @check_update
1056     def mkdir(self, name):
1057         if not self.writable():
1058             raise llfuse.FUSEError(errno.EROFS)
1059
1060         try:
1061             with llfuse.lock_released:
1062                 c = {
1063                     "owner_uuid": self.project_uuid,
1064                     "name": name,
1065                     "manifest_text": "" }
1066                 if self.storage_classes is not None:
1067                     c["storage_classes_desired"] = self.storage_classes
1068                 try:
1069                     self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1070                 except Exception as e:
1071                     raise
1072             self.invalidate()
1073         except apiclient_errors.Error as error:
1074             _logger.error(error)
1075             raise llfuse.FUSEError(errno.EEXIST)
1076
1077     @use_counter
1078     @check_update
1079     def rmdir(self, name):
1080         if not self.writable():
1081             raise llfuse.FUSEError(errno.EROFS)
1082
1083         if name not in self:
1084             raise llfuse.FUSEError(errno.ENOENT)
1085         if not isinstance(self[name], CollectionDirectory):
1086             raise llfuse.FUSEError(errno.EPERM)
1087         if len(self[name]) > 0:
1088             raise llfuse.FUSEError(errno.ENOTEMPTY)
1089         with llfuse.lock_released:
1090             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1091         self.invalidate()
1092
1093     @use_counter
1094     @check_update
1095     def rename(self, name_old, name_new, src):
1096         if not self.writable():
1097             raise llfuse.FUSEError(errno.EROFS)
1098
1099         if not isinstance(src, ProjectDirectory):
1100             raise llfuse.FUSEError(errno.EPERM)
1101
1102         ent = src[name_old]
1103
1104         if not isinstance(ent, CollectionDirectory):
1105             raise llfuse.FUSEError(errno.EPERM)
1106
1107         if name_new in self:
1108             # POSIX semantics for replacing one directory with another is
1109             # tricky (the target directory must be empty, the operation must be
1110             # atomic which isn't possible with the Arvados API as of this
1111             # writing) so don't support that.
1112             raise llfuse.FUSEError(errno.EPERM)
1113
1114         self.api.collections().update(uuid=ent.uuid(),
1115                                       body={"owner_uuid": self.uuid(),
1116                                             "name": name_new}).execute(num_retries=self.num_retries)
1117
1118         # Acually move the entry from source directory to this directory.
1119         del src._entries[name_old]
1120         self._entries[name_new] = ent
1121         self.inodes.invalidate_entry(src, name_old)
1122
1123     @use_counter
1124     def child_event(self, ev):
1125         properties = ev.get("properties") or {}
1126         old_attrs = properties.get("old_attributes") or {}
1127         new_attrs = properties.get("new_attributes") or {}
1128         old_attrs["uuid"] = ev["object_uuid"]
1129         new_attrs["uuid"] = ev["object_uuid"]
1130         old_name = self.sanitize_filename(self.namefn(old_attrs))
1131         new_name = self.sanitize_filename(self.namefn(new_attrs))
1132
1133         # create events will have a new name, but not an old name
1134         # delete events will have an old name, but not a new name
1135         # update events will have an old and new name, and they may be same or different
1136         # if they are the same, an unrelated field changed and there is nothing to do.
1137
1138         if old_attrs.get("owner_uuid") != self.project_uuid:
1139             # Was moved from somewhere else, so don't try to remove entry.
1140             old_name = None
1141         if ev.get("object_owner_uuid") != self.project_uuid:
1142             # Was moved to somewhere else, so don't try to add entry
1143             new_name = None
1144
1145         if old_attrs.get("is_trashed"):
1146             # Was previously deleted
1147             old_name = None
1148         if new_attrs.get("is_trashed"):
1149             # Has been deleted
1150             new_name = None
1151
1152         if new_name != old_name:
1153             ent = None
1154             if old_name in self._entries:
1155                 ent = self._entries[old_name]
1156                 del self._entries[old_name]
1157                 self.inodes.invalidate_entry(self, old_name)
1158
1159             if new_name:
1160                 if ent is not None:
1161                     self._entries[new_name] = ent
1162                 else:
1163                     self._add_entry(new_attrs, new_name)
1164             elif ent is not None:
1165                 self.inodes.del_entry(ent)
1166
1167
1168 class SharedDirectory(Directory):
1169     """A special directory that represents users or groups who have shared projects with me."""
1170
1171     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, exclude,
1172                  poll=False, poll_time=60, storage_classes=None):
1173         super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
1174         self.api = api
1175         self.num_retries = num_retries
1176         self.current_user = api.users().current().execute(num_retries=num_retries)
1177         self._poll = True
1178         self._poll_time = poll_time
1179         self._updating_lock = threading.Lock()
1180         self.storage_classes = storage_classes
1181
1182     @use_counter
1183     def update(self):
1184         try:
1185             with llfuse.lock_released:
1186                 self._updating_lock.acquire()
1187                 if not self.stale():
1188                     return
1189
1190                 contents = {}
1191                 roots = []
1192                 root_owners = set()
1193                 objects = {}
1194
1195                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1196                 if 'httpMethod' in methods.get('shared', {}):
1197                     page = []
1198                     while True:
1199                         resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
1200                                                         order="uuid",
1201                                                         limit=10000,
1202                                                         count="none",
1203                                                         include="owner_uuid").execute()
1204                         if not resp["items"]:
1205                             break
1206                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1207                         for r in resp["items"]:
1208                             objects[r["uuid"]] = r
1209                             roots.append(r["uuid"])
1210                         for r in resp["included"]:
1211                             objects[r["uuid"]] = r
1212                             root_owners.add(r["uuid"])
1213                 else:
1214                     all_projects = list(arvados.util.keyset_list_all(
1215                         self.api.groups().list,
1216                         order_key="uuid",
1217                         num_retries=self.num_retries,
1218                         filters=[['group_class','in',['project','filter']]],
1219                         select=["uuid", "owner_uuid"]))
1220                     for ob in all_projects:
1221                         objects[ob['uuid']] = ob
1222
1223                     current_uuid = self.current_user['uuid']
1224                     for ob in all_projects:
1225                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1226                             roots.append(ob['uuid'])
1227                             root_owners.add(ob['owner_uuid'])
1228
1229                     lusers = arvados.util.keyset_list_all(
1230                         self.api.users().list,
1231                         order_key="uuid",
1232                         num_retries=self.num_retries,
1233                         filters=[['uuid','in', list(root_owners)]])
1234                     lgroups = arvados.util.keyset_list_all(
1235                         self.api.groups().list,
1236                         order_key="uuid",
1237                         num_retries=self.num_retries,
1238                         filters=[['uuid','in', list(root_owners)+roots]])
1239
1240                     for l in lusers:
1241                         objects[l["uuid"]] = l
1242                     for l in lgroups:
1243                         objects[l["uuid"]] = l
1244
1245                 for r in root_owners:
1246                     if r in objects:
1247                         obr = objects[r]
1248                         if obr.get("name"):
1249                             contents[obr["name"]] = obr
1250                         elif "first_name" in obr:
1251                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1252
1253                 for r in roots:
1254                     if r in objects:
1255                         obr = objects[r]
1256                         if obr['owner_uuid'] not in objects:
1257                             contents[obr["name"]] = obr
1258
1259             # end with llfuse.lock_released, re-acquire lock
1260
1261             self.merge(contents.items(),
1262                        lambda i: i[0],
1263                        lambda a, i: a.uuid() == i[1]['uuid'],
1264                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
1265                                                   i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
1266         except Exception:
1267             _logger.exception("arv-mount shared dir error")
1268         finally:
1269             self._updating_lock.release()
1270
1271     def want_event_subscribe(self):
1272         return True