Merge branch 'main' into 21541-arv-mount-keyerror-rebase refs #21541
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import apiclient
6 import arvados
7 import errno
8 import functools
9 import llfuse
10 import logging
11 import re
12 import sys
13 import threading
14 import time
15 from apiclient import errors as apiclient_errors
16
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
19
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
22
23 _logger = logging.getLogger('arvados.arvados_fuse')
24
25
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile(r'[\x00/]')
30
31
32 class Directory(FreshBase):
33     """Generic directory object, backed by a dict.
34
35     Consists of a set of entries with the key representing the filename
36     and the value referencing a File or Directory object.
37     """
38
39     def __init__(self, parent_inode, inodes, enable_write, filters):
40         """parent_inode is the integer inode number"""
41
42         super(Directory, self).__init__()
43
44         self.inode = None
45         if not isinstance(parent_inode, int):
46             raise Exception("parent_inode should be an int")
47         self.parent_inode = parent_inode
48         self.inodes = inodes
49         self._entries = {}
50         self._mtime = time.time()
51         self._enable_write = enable_write
52         self._filters = filters or []
53
54     def _filters_for(self, subtype, *, qualified):
55         for f in self._filters:
56             f_type, _, f_name = f[0].partition('.')
57             if not f_name:
58                 yield f
59             elif f_type != subtype:
60                 pass
61             elif qualified:
62                 yield f
63             else:
64                 yield [f_name, *f[1:]]
65
66     def unsanitize_filename(self, incoming):
67         """Replace ForwardSlashNameSubstitution value with /"""
68         fsns = self.inodes.forward_slash_subst()
69         if isinstance(fsns, str):
70             return incoming.replace(fsns, '/')
71         else:
72             return incoming
73
74     def sanitize_filename(self, dirty):
75         """Replace disallowed filename characters according to
76         ForwardSlashNameSubstitution in self.api_config."""
77         # '.' and '..' are not reachable if API server is newer than #6277
78         if dirty is None:
79             return None
80         elif dirty == '':
81             return '_'
82         elif dirty == '.':
83             return '_'
84         elif dirty == '..':
85             return '__'
86         else:
87             fsns = self.inodes.forward_slash_subst()
88             if isinstance(fsns, str):
89                 dirty = dirty.replace('/', fsns)
90             return _disallowed_filename_characters.sub('_', dirty)
91
92
93     #  Overridden by subclasses to implement logic to update the
94     #  entries dict when the directory is stale
95     @use_counter
96     def update(self):
97         pass
98
99     # Only used when computing the size of the disk footprint of the directory
100     # (stub)
101     def size(self):
102         return 0
103
104     def persisted(self):
105         return False
106
107     def checkupdate(self):
108         if self.stale():
109             try:
110                 self.update()
111             except apiclient.errors.HttpError as e:
112                 _logger.warn(e)
113
114     @use_counter
115     @check_update
116     def __getitem__(self, item):
117         return self._entries[item]
118
119     @use_counter
120     @check_update
121     def items(self):
122         return list(self._entries.items())
123
124     @use_counter
125     @check_update
126     def __contains__(self, k):
127         return k in self._entries
128
129     @use_counter
130     @check_update
131     def __len__(self):
132         return len(self._entries)
133
134     def fresh(self):
135         self.inodes.touch(self)
136         super(Directory, self).fresh()
137
138     def objsize(self):
139         # This is a very rough guess of the amount of overhead involved for
140         # each directory entry (128 bytes is 16 * 8-byte pointers).
141         return len(self._entries) * 128
142
143     def merge(self, items, fn, same, new_entry):
144         """Helper method for updating the contents of the directory.
145
146         Takes a list describing the new contents of the directory, reuse
147         entries that are the same in both the old and new lists, create new
148         entries, and delete old entries missing from the new list.
149
150         Arguments:
151         * items: Iterable --- New directory contents
152
153         * fn: Callable --- Takes an entry in 'items' and return the desired file or
154         directory name, or None if this entry should be skipped
155
156         * same: Callable --- Compare an existing entry (a File or Directory
157         object) with an entry in the items list to determine whether to keep
158         the existing entry.
159
160         * new_entry: Callable --- Create a new directory entry (File or Directory
161         object) from an entry in the items list.
162
163         """
164
165         oldentries = self._entries
166         self._entries = {}
167         changed = False
168         for i in items:
169             name = self.sanitize_filename(fn(i))
170             if not name:
171                 continue
172             if name in oldentries:
173                 ent = oldentries[name]
174                 if same(ent, i):
175                     # move existing directory entry over
176                     self._entries[name] = ent
177                     del oldentries[name]
178                     self.inodes.inode_cache.touch(ent)
179
180         for i in items:
181             name = self.sanitize_filename(fn(i))
182             if not name:
183                 continue
184             if name not in self._entries:
185                 # create new directory entry
186                 ent = new_entry(i)
187                 if ent is not None:
188                     self._entries[name] = self.inodes.add_entry(ent)
189                     changed = True
190                 _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
191
192         # delete any other directory entries that were not in found in 'items'
193         for i in oldentries:
194             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
195             self.inodes.invalidate_entry(self, i)
196             self.inodes.del_entry(oldentries[i])
197             changed = True
198
199         if changed:
200             self.inodes.invalidate_inode(self)
201             self._mtime = time.time()
202             self.inodes.inode_cache.update_cache_size(self)
203
204         self.fresh()
205
206     def in_use(self):
207         if super(Directory, self).in_use():
208             return True
209         for v in self._entries.values():
210             if v.in_use():
211                 return True
212         return False
213
214     def clear(self):
215         """Delete all entries"""
216         oldentries = self._entries
217         self._entries = {}
218         self.invalidate()
219         for n in oldentries:
220             self.inodes.del_entry(oldentries[n])
221         self.inodes.inode_cache.update_cache_size(self)
222
223     def kernel_invalidate(self):
224         # Invalidating the dentry on the parent implies invalidating all paths
225         # below it as well.
226         if self.parent_inode in self.inodes:
227             parent = self.inodes[self.parent_inode]
228         else:
229             # parent was removed already.
230             return
231
232         # Find self on the parent in order to invalidate this path.
233         # Calling the public items() method might trigger a refresh,
234         # which we definitely don't want, so read the internal dict directly.
235         for k,v in parent._entries.items():
236             if v is self:
237                 self.inodes.invalidate_entry(parent, k)
238                 break
239
240         self.inodes.invalidate_inode(self)
241
242     def mtime(self):
243         return self._mtime
244
245     def writable(self):
246         return False
247
248     def flush(self):
249         pass
250
251     def want_event_subscribe(self):
252         raise NotImplementedError()
253
254     def create(self, name):
255         raise NotImplementedError()
256
257     def mkdir(self, name):
258         raise NotImplementedError()
259
260     def unlink(self, name):
261         raise NotImplementedError()
262
263     def rmdir(self, name):
264         raise NotImplementedError()
265
266     def rename(self, name_old, name_new, src):
267         raise NotImplementedError()
268
269
270 class CollectionDirectoryBase(Directory):
271     """Represent an Arvados Collection as a directory.
272
273     This class is used for Subcollections, and is also the base class for
274     CollectionDirectory, which implements collection loading/saving on
275     Collection records.
276
277     Most operations act only the underlying Arvados `Collection` object.  The
278     `Collection` object signals via a notify callback to
279     `CollectionDirectoryBase.on_event` that an item was added, removed or
280     modified.  FUSE inodes and directory entries are created, deleted or
281     invalidated in response to these events.
282
283     """
284
285     def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root):
286         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
287         self.collection = collection
288         self.collection_root = collection_root
289         self.collection_record_file = None
290
291     def new_entry(self, name, item, mtime):
292         name = self.sanitize_filename(name)
293         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
294             if item.fuse_entry.dead is not True:
295                 raise Exception("Can only reparent dead inode entry")
296             if item.fuse_entry.inode is None:
297                 raise Exception("Reparented entry must still have valid inode")
298             item.fuse_entry.dead = False
299             self._entries[name] = item.fuse_entry
300         elif isinstance(item, arvados.collection.RichCollectionBase):
301             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
302                 self.inode,
303                 self.inodes,
304                 self._enable_write,
305                 self._filters,
306                 item,
307                 self.collection_root,
308             ))
309             self._entries[name].populate(mtime)
310         else:
311             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
312         item.fuse_entry = self._entries[name]
313
314     def on_event(self, event, collection, name, item):
315         # These are events from the Collection object (ADD/DEL/MOD)
316         # emitted by operations on the Collection object (like
317         # "mkdirs" or "remove"), and by "update", which we need to
318         # synchronize with our FUSE objects that are assigned inodes.
319         if collection == self.collection:
320             name = self.sanitize_filename(name)
321
322             #
323             # It's possible for another thread to have llfuse.lock and
324             # be waiting on collection.lock.  Meanwhile, we released
325             # llfuse.lock earlier in the stack, but are still holding
326             # on to the collection lock, and now we need to re-acquire
327             # llfuse.lock.  If we don't release the collection lock,
328             # we'll deadlock where we're holding the collection lock
329             # waiting for llfuse.lock and the other thread is holding
330             # llfuse.lock and waiting for the collection lock.
331             #
332             # The correct locking order here is to take llfuse.lock
333             # first, then the collection lock.
334             #
335             # Since collection.lock is an RLock, it might be locked
336             # multiple times, so we need to release it multiple times,
337             # keep a count, then re-lock it the correct number of
338             # times.
339             #
340             lockcount = 0
341             try:
342                 while True:
343                     self.collection.lock.release()
344                     lockcount += 1
345             except RuntimeError:
346                 pass
347
348             try:
349                 with llfuse.lock:
350                     with self.collection.lock:
351                         if event == arvados.collection.ADD:
352                             self.new_entry(name, item, self.mtime())
353                         elif event == arvados.collection.DEL:
354                             ent = self._entries[name]
355                             del self._entries[name]
356                             self.inodes.invalidate_entry(self, name)
357                             self.inodes.del_entry(ent)
358                         elif event == arvados.collection.MOD:
359                             if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
360                                 self.inodes.invalidate_inode(item.fuse_entry)
361                             elif name in self._entries:
362                                 self.inodes.invalidate_inode(self._entries[name])
363
364                         if self.collection_record_file is not None:
365                             self.collection_record_file.invalidate()
366                             self.inodes.invalidate_inode(self.collection_record_file)
367             finally:
368                 while lockcount > 0:
369                     self.collection.lock.acquire()
370                     lockcount -= 1
371
372     def populate(self, mtime):
373         self._mtime = mtime
374         with self.collection.lock:
375             self.collection.subscribe(self.on_event)
376             for entry, item in self.collection.items():
377                 self.new_entry(entry, item, self.mtime())
378
379     def writable(self):
380         return self._enable_write and self.collection.writable()
381
382     @use_counter
383     def flush(self):
384         self.collection_root.flush()
385
386     @use_counter
387     @check_update
388     def create(self, name):
389         if not self.writable():
390             raise llfuse.FUSEError(errno.EROFS)
391         with llfuse.lock_released:
392             self.collection.open(name, "w").close()
393
394     @use_counter
395     @check_update
396     def mkdir(self, name):
397         if not self.writable():
398             raise llfuse.FUSEError(errno.EROFS)
399         with llfuse.lock_released:
400             self.collection.mkdirs(name)
401
402     @use_counter
403     @check_update
404     def unlink(self, name):
405         if not self.writable():
406             raise llfuse.FUSEError(errno.EROFS)
407         with llfuse.lock_released:
408             self.collection.remove(name)
409         self.flush()
410
411     @use_counter
412     @check_update
413     def rmdir(self, name):
414         if not self.writable():
415             raise llfuse.FUSEError(errno.EROFS)
416         with llfuse.lock_released:
417             self.collection.remove(name)
418         self.flush()
419
420     @use_counter
421     @check_update
422     def rename(self, name_old, name_new, src):
423         if not self.writable():
424             raise llfuse.FUSEError(errno.EROFS)
425
426         if not isinstance(src, CollectionDirectoryBase):
427             raise llfuse.FUSEError(errno.EPERM)
428
429         if name_new in self:
430             ent = src[name_old]
431             tgt = self[name_new]
432             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
433                 pass
434             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
435                 if len(tgt) > 0:
436                     raise llfuse.FUSEError(errno.ENOTEMPTY)
437             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
438                 raise llfuse.FUSEError(errno.ENOTDIR)
439             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
440                 raise llfuse.FUSEError(errno.EISDIR)
441
442         with llfuse.lock_released:
443             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
444         self.flush()
445         src.flush()
446
447     def clear(self):
448         super(CollectionDirectoryBase, self).clear()
449         self.collection = None
450
451     def objsize(self):
452         # objsize for the whole collection is represented at the root,
453         # don't double-count it
454         return 0
455
456 class CollectionDirectory(CollectionDirectoryBase):
457     """Represents the root of a directory tree representing a collection."""
458
459     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
460         super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
461         self.api = api
462         self.num_retries = num_retries
463         self._poll = True
464         try:
465             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
466         except:
467             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
468             self._poll_time = 60*60
469
470         if isinstance(collection_record, dict):
471             self.collection_locator = collection_record['uuid']
472             self._mtime = convertTime(collection_record.get('modified_at'))
473         else:
474             self.collection_locator = collection_record
475             self._mtime = 0
476         self._manifest_size = 0
477         if self.collection_locator:
478             self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
479         self._updating_lock = threading.Lock()
480
481     def same(self, i):
482         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
483
484     def writable(self):
485         return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
486
487     @use_counter
488     def flush(self):
489         if not self.writable():
490             return
491         with llfuse.lock_released:
492             with self._updating_lock:
493                 if self.collection.committed():
494                     self.collection.update()
495                 else:
496                     self.collection.save()
497                 self.new_collection_record(self.collection.api_response())
498
499     def want_event_subscribe(self):
500         return (uuid_pattern.match(self.collection_locator) is not None)
501
502     def new_collection(self, new_collection_record, coll_reader):
503         if self.inode:
504             self.clear()
505         self.collection = coll_reader
506         self.new_collection_record(new_collection_record)
507         self.populate(self.mtime())
508
509     def new_collection_record(self, new_collection_record):
510         if not new_collection_record:
511             raise Exception("invalid new_collection_record")
512         self._mtime = convertTime(new_collection_record.get('modified_at'))
513         self._manifest_size = len(new_collection_record["manifest_text"])
514         self.collection_locator = new_collection_record["uuid"]
515         if self.collection_record_file is not None:
516             self.collection_record_file.invalidate()
517             self.inodes.invalidate_inode(self.collection_record_file)
518             _logger.debug("%s invalidated collection record file", self)
519         self.inodes.inode_cache.update_cache_size(self)
520         self.fresh()
521
522     def uuid(self):
523         return self.collection_locator
524
525     @use_counter
526     def update(self):
527         try:
528             if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator):
529                 # It's immutable, nothing to update
530                 return True
531
532             if self.collection_locator is None:
533                 # No collection locator to retrieve from
534                 self.fresh()
535                 return True
536
537             new_collection_record = None
538             try:
539                 with llfuse.lock_released:
540                     self._updating_lock.acquire()
541                     if not self.stale():
542                         return True
543
544                     _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
545                     coll_reader = None
546                     if self.collection is not None:
547                         # Already have a collection object
548                         self.collection.update()
549                         new_collection_record = self.collection.api_response()
550                     else:
551                         # Create a new collection object
552                         if uuid_pattern.match(self.collection_locator):
553                             coll_reader = arvados.collection.Collection(
554                                 self.collection_locator, self.api, self.api.keep,
555                                 num_retries=self.num_retries)
556                         else:
557                             coll_reader = arvados.collection.CollectionReader(
558                                 self.collection_locator, self.api, self.api.keep,
559                                 num_retries=self.num_retries)
560                         new_collection_record = coll_reader.api_response() or {}
561                         # If the Collection only exists in Keep, there will be no API
562                         # response.  Fill in the fields we need.
563                         if 'uuid' not in new_collection_record:
564                             new_collection_record['uuid'] = self.collection_locator
565                         if "portable_data_hash" not in new_collection_record:
566                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
567                         if 'manifest_text' not in new_collection_record:
568                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
569                         if 'storage_classes_desired' not in new_collection_record:
570                             new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
571
572                 # end with llfuse.lock_released, re-acquire lock
573
574                 if new_collection_record is not None:
575                     if coll_reader is not None:
576                         self.new_collection(new_collection_record, coll_reader)
577                     else:
578                         self.new_collection_record(new_collection_record)
579
580                 return True
581             finally:
582                 self._updating_lock.release()
583         except arvados.errors.NotFoundError as e:
584             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
585         except arvados.errors.ArgumentError as detail:
586             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
587             if new_collection_record is not None and "manifest_text" in new_collection_record:
588                 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
589         except Exception:
590             _logger.exception("arv-mount %s: error", self.collection_locator)
591             if new_collection_record is not None and "manifest_text" in new_collection_record:
592                 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
593         self.invalidate()
594         return False
595
596     @use_counter
597     def collection_record(self):
598         self.flush()
599         return self.collection.api_response()
600
601     @use_counter
602     @check_update
603     def __getitem__(self, item):
604         if item == '.arvados#collection':
605             if self.collection_record_file is None:
606                 self.collection_record_file = FuncToJSONFile(
607                     self.inode, self.collection_record)
608                 self.inodes.add_entry(self.collection_record_file)
609             self.invalidate()  # use lookup as a signal to force update
610             return self.collection_record_file
611         else:
612             return super(CollectionDirectory, self).__getitem__(item)
613
614     def __contains__(self, k):
615         if k == '.arvados#collection':
616             return True
617         else:
618             return super(CollectionDirectory, self).__contains__(k)
619
620     def invalidate(self):
621         if self.collection_record_file is not None:
622             self.collection_record_file.invalidate()
623             self.inodes.invalidate_inode(self.collection_record_file)
624         super(CollectionDirectory, self).invalidate()
625
626     def persisted(self):
627         return (self.collection_locator is not None)
628
629     def objsize(self):
630         # This is a very rough guess of the amount of overhead
631         # involved for a collection; you've got the manifest text
632         # itself which is not discarded by the Collection class, then
633         # the block identifiers that get copied into their own
634         # strings, then the rest of the overhead of the Python
635         # objects.
636         return self._manifest_size * 4
637
638     def finalize(self):
639         if self.collection is not None:
640             if self.writable():
641                 try:
642                     self.collection.save()
643                 except Exception as e:
644                     _logger.exception("Failed to save collection %s", self.collection_locator)
645             self.collection.stop_threads()
646
647     def clear(self):
648         if self.collection is not None:
649             self.collection.stop_threads()
650         self._manifest_size = 0
651         super(CollectionDirectory, self).clear()
652
653
654 class TmpCollectionDirectory(CollectionDirectoryBase):
655     """A directory backed by an Arvados collection that never gets saved.
656
657     This supports using Keep as scratch space. A userspace program can
658     read the .arvados#collection file to get a current manifest in
659     order to save a snapshot of the scratch data or use it as a crunch
660     job output.
661     """
662
663     class UnsaveableCollection(arvados.collection.Collection):
664         def save(self):
665             pass
666         def save_new(self):
667             pass
668
669     def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None):
670         collection = self.UnsaveableCollection(
671             api_client=api_client,
672             keep_client=api_client.keep,
673             num_retries=num_retries,
674             storage_classes_desired=storage_classes)
675         # This is always enable_write=True because it never tries to
676         # save to the backend
677         super(TmpCollectionDirectory, self).__init__(
678             parent_inode, inodes, True, filters, collection, self)
679         self.populate(self.mtime())
680
681     def on_event(self, *args, **kwargs):
682         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
683         if self.collection_record_file is None:
684             return
685
686         # See discussion in CollectionDirectoryBase.on_event
687         lockcount = 0
688         try:
689             while True:
690                 self.collection.lock.release()
691                 lockcount += 1
692         except RuntimeError:
693             pass
694
695         try:
696             with llfuse.lock:
697                 with self.collection.lock:
698                     self.collection_record_file.invalidate()
699                     self.inodes.invalidate_inode(self.collection_record_file)
700                     _logger.debug("%s invalidated collection record", self)
701         finally:
702             while lockcount > 0:
703                 self.collection.lock.acquire()
704                 lockcount -= 1
705
706     def collection_record(self):
707         with llfuse.lock_released:
708             return {
709                 "uuid": None,
710                 "manifest_text": self.collection.manifest_text(),
711                 "portable_data_hash": self.collection.portable_data_hash(),
712                 "storage_classes_desired": self.collection.storage_classes_desired(),
713             }
714
715     def __contains__(self, k):
716         return (k == '.arvados#collection' or
717                 super(TmpCollectionDirectory, self).__contains__(k))
718
719     @use_counter
720     def __getitem__(self, item):
721         if item == '.arvados#collection':
722             if self.collection_record_file is None:
723                 self.collection_record_file = FuncToJSONFile(
724                     self.inode, self.collection_record)
725                 self.inodes.add_entry(self.collection_record_file)
726             return self.collection_record_file
727         return super(TmpCollectionDirectory, self).__getitem__(item)
728
729     def persisted(self):
730         return False
731
732     def writable(self):
733         return True
734
735     def flush(self):
736         pass
737
738     def want_event_subscribe(self):
739         return False
740
741     def finalize(self):
742         self.collection.stop_threads()
743
744     def invalidate(self):
745         if self.collection_record_file:
746             self.collection_record_file.invalidate()
747         super(TmpCollectionDirectory, self).invalidate()
748
749
750 class MagicDirectory(Directory):
751     """A special directory that logically contains the set of all extant keep locators.
752
753     When a file is referenced by lookup(), it is tested to see if it is a valid
754     keep locator to a manifest, and if so, loads the manifest contents as a
755     subdirectory of this directory with the locator as the directory name.
756     Since querying a list of all extant keep locators is impractical, only
757     collections that have already been accessed are visible to readdir().
758
759     """
760
761     README_TEXT = """
762 This directory provides access to Arvados collections as subdirectories listed
763 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
764 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
765 (in the form 'zzzzz-j7d0g-1234567890abcde').
766
767 Note that this directory will appear empty until you attempt to access a
768 specific collection or project subdirectory (such as trying to 'cd' into it),
769 at which point the collection or project will actually be looked up on the server
770 and the directory will appear if it exists.
771
772 """.lstrip()
773
774     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
775         super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
776         self.api = api
777         self.num_retries = num_retries
778         self.pdh_only = pdh_only
779         self.storage_classes = storage_classes
780
781     def __setattr__(self, name, value):
782         super(MagicDirectory, self).__setattr__(name, value)
783         # When we're assigned an inode, add a README.
784         if ((name == 'inode') and (self.inode is not None) and
785               (not self._entries)):
786             self._entries['README'] = self.inodes.add_entry(
787                 StringFile(self.inode, self.README_TEXT, time.time()))
788             # If we're the root directory, add an identical by_id subdirectory.
789             if self.inode == llfuse.ROOT_INODE:
790                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
791                     self.inode,
792                     self.inodes,
793                     self.api,
794                     self.num_retries,
795                     self._enable_write,
796                     self._filters,
797                     self.pdh_only,
798                 ))
799
800     def __contains__(self, k):
801         if k in self._entries:
802             return True
803
804         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
805             return False
806
807         try:
808             e = None
809
810             if group_uuid_pattern.match(k):
811                 project = self.api.groups().list(
812                     filters=[
813                         ['group_class', 'in', ['project','filter']],
814                         ["uuid", "=", k],
815                         *self._filters_for('groups', qualified=False),
816                     ],
817                 ).execute(num_retries=self.num_retries)
818                 if project[u'items_available'] == 0:
819                     return False
820                 e = self.inodes.add_entry(ProjectDirectory(
821                     self.inode,
822                     self.inodes,
823                     self.api,
824                     self.num_retries,
825                     self._enable_write,
826                     self._filters,
827                     project[u'items'][0],
828                     storage_classes=self.storage_classes,
829                 ))
830             else:
831                 e = self.inodes.add_entry(CollectionDirectory(
832                     self.inode,
833                     self.inodes,
834                     self.api,
835                     self.num_retries,
836                     self._enable_write,
837                     self._filters,
838                     k,
839                 ))
840
841             if e.update():
842                 if k not in self._entries:
843                     self._entries[k] = e
844                 else:
845                     self.inodes.del_entry(e)
846                 return True
847             else:
848                 self.inodes.invalidate_entry(self, k)
849                 self.inodes.del_entry(e)
850                 return False
851         except Exception as ex:
852             _logger.exception("arv-mount lookup '%s':", k)
853             if e is not None:
854                 self.inodes.del_entry(e)
855             return False
856
857     def __getitem__(self, item):
858         if item in self:
859             return self._entries[item]
860         else:
861             raise KeyError("No collection with id " + item)
862
863     def clear(self):
864         pass
865
866     def want_event_subscribe(self):
867         return not self.pdh_only
868
869
870 class TagsDirectory(Directory):
871     """A special directory that contains as subdirectories all tags visible to the user."""
872
873     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
874         super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
875         self.api = api
876         self.num_retries = num_retries
877         self._poll = True
878         self._poll_time = poll_time
879         self._extra = set()
880
881     def want_event_subscribe(self):
882         return True
883
884     @use_counter
885     def update(self):
886         with llfuse.lock_released:
887             tags = self.api.links().list(
888                 filters=[
889                     ['link_class', '=', 'tag'],
890                     ['name', '!=', ''],
891                     *self._filters_for('links', qualified=False),
892                 ],
893                 select=['name'],
894                 distinct=True,
895                 limit=1000,
896             ).execute(num_retries=self.num_retries)
897         if "items" in tags:
898             self.merge(
899                 tags['items']+[{"name": n} for n in self._extra],
900                 lambda i: i['name'],
901                 lambda a, i: a.tag == i['name'],
902                 lambda i: TagDirectory(
903                     self.inode,
904                     self.inodes,
905                     self.api,
906                     self.num_retries,
907                     self._enable_write,
908                     self._filters,
909                     i['name'],
910                     poll=self._poll,
911                     poll_time=self._poll_time,
912                 ),
913             )
914
915     @use_counter
916     @check_update
917     def __getitem__(self, item):
918         if super(TagsDirectory, self).__contains__(item):
919             return super(TagsDirectory, self).__getitem__(item)
920         with llfuse.lock_released:
921             tags = self.api.links().list(
922                 filters=[
923                     ['link_class', '=', 'tag'],
924                     ['name', '=', item],
925                     *self._filters_for('links', qualified=False),
926                 ],
927                 limit=1,
928             ).execute(num_retries=self.num_retries)
929         if tags["items"]:
930             self._extra.add(item)
931             self.update()
932         return super(TagsDirectory, self).__getitem__(item)
933
934     @use_counter
935     @check_update
936     def __contains__(self, k):
937         if super(TagsDirectory, self).__contains__(k):
938             return True
939         try:
940             self[k]
941             return True
942         except KeyError:
943             pass
944         return False
945
946
947 class TagDirectory(Directory):
948     """A special directory that contains as subdirectories all collections visible
949     to the user that are tagged with a particular tag.
950     """
951
952     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
953                  poll=False, poll_time=60):
954         super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
955         self.api = api
956         self.num_retries = num_retries
957         self.tag = tag
958         self._poll = poll
959         self._poll_time = poll_time
960
961     def want_event_subscribe(self):
962         return True
963
964     @use_counter
965     def update(self):
966         with llfuse.lock_released:
967             taggedcollections = self.api.links().list(
968                 filters=[
969                     ['link_class', '=', 'tag'],
970                     ['name', '=', self.tag],
971                     ['head_uuid', 'is_a', 'arvados#collection'],
972                     *self._filters_for('links', qualified=False),
973                 ],
974                 select=['head_uuid'],
975             ).execute(num_retries=self.num_retries)
976         self.merge(
977             taggedcollections['items'],
978             lambda i: i['head_uuid'],
979             lambda a, i: a.collection_locator == i['head_uuid'],
980             lambda i: CollectionDirectory(
981                 self.inode,
982                 self.inodes,
983                 self.api,
984                 self.num_retries,
985                 self._enable_write,
986                 self._filters,
987                 i['head_uuid'],
988             ),
989         )
990
991
992 class ProjectDirectory(Directory):
993     """A special directory that contains the contents of a project."""
994
995     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
996                  project_object, poll=True, poll_time=3, storage_classes=None):
997         super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
998         self.api = api
999         self.num_retries = num_retries
1000         self.project_object = project_object
1001         self.project_object_file = None
1002         self.project_uuid = project_object['uuid']
1003         self._poll = poll
1004         self._poll_time = poll_time
1005         self._updating_lock = threading.Lock()
1006         self._current_user = None
1007         self._full_listing = False
1008         self.storage_classes = storage_classes
1009         self.recursively_contained = False
1010
1011         # Filter groups can contain themselves, which causes tools
1012         # that walk the filesystem to get stuck in an infinite loop,
1013         # so suppress returning a listing in that case.
1014         if self.project_object.get("group_class") == "filter":
1015             iter_parent_inode = parent_inode
1016             while iter_parent_inode != llfuse.ROOT_INODE:
1017                 parent_dir = self.inodes[iter_parent_inode]
1018                 if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid:
1019                     self.recursively_contained = True
1020                     break
1021                 iter_parent_inode = parent_dir.parent_inode
1022
1023     def want_event_subscribe(self):
1024         return True
1025
1026     def createDirectory(self, i):
1027         common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters)
1028         if collection_uuid_pattern.match(i['uuid']):
1029             return CollectionDirectory(*common_args, i)
1030         elif group_uuid_pattern.match(i['uuid']):
1031             return ProjectDirectory(*common_args, i, self._poll, self._poll_time, self.storage_classes)
1032         elif link_uuid_pattern.match(i['uuid']):
1033             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
1034                 return CollectionDirectory(*common_args, i['head_uuid'])
1035             else:
1036                 return None
1037         elif uuid_pattern.match(i['uuid']):
1038             return ObjectFile(self.parent_inode, i)
1039         else:
1040             return None
1041
1042     def uuid(self):
1043         return self.project_uuid
1044
1045     def items(self):
1046         self._full_listing = True
1047         return super(ProjectDirectory, self).items()
1048
1049     def namefn(self, i):
1050         if 'name' in i:
1051             if i['name'] is None or len(i['name']) == 0:
1052                 return None
1053             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
1054                 # collection or subproject
1055                 return i['name']
1056             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
1057                 # name link
1058                 return i['name']
1059             elif 'kind' in i and i['kind'].startswith('arvados#'):
1060                 # something else
1061                 return "{}.{}".format(i['name'], i['kind'][8:])
1062         else:
1063             return None
1064
1065
1066     @use_counter
1067     def update(self):
1068         if self.project_object_file == None:
1069             self.project_object_file = ObjectFile(self.inode, self.project_object)
1070             self.inodes.add_entry(self.project_object_file)
1071
1072         if self.recursively_contained or not self._full_listing:
1073             return True
1074
1075         def samefn(a, i):
1076             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
1077                 return a.uuid() == i['uuid']
1078             elif isinstance(a, ObjectFile):
1079                 return a.uuid() == i['uuid'] and not a.stale()
1080             return False
1081
1082         try:
1083             with llfuse.lock_released:
1084                 self._updating_lock.acquire()
1085                 if not self.stale():
1086                     return
1087
1088                 if group_uuid_pattern.match(self.project_uuid):
1089                     self.project_object = self.api.groups().get(
1090                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
1091                 elif user_uuid_pattern.match(self.project_uuid):
1092                     self.project_object = self.api.users().get(
1093                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
1094                 # do this in 2 steps until #17424 is fixed
1095                 contents = list(arvados.util.keyset_list_all(
1096                     self.api.groups().contents,
1097                     order_key='uuid',
1098                     num_retries=self.num_retries,
1099                     uuid=self.project_uuid,
1100                     filters=[
1101                         ['uuid', 'is_a', 'arvados#group'],
1102                         ['groups.group_class', 'in', ['project', 'filter']],
1103                         *self._filters_for('groups', qualified=True),
1104                     ],
1105                 ))
1106                 contents.extend(obj for obj in arvados.util.keyset_list_all(
1107                     self.api.groups().contents,
1108                     order_key='uuid',
1109                     num_retries=self.num_retries,
1110                     uuid=self.project_uuid,
1111                     filters=[
1112                         ['uuid', 'is_a', 'arvados#collection'],
1113                         *self._filters_for('collections', qualified=True),
1114                     ],
1115                 ) if obj['current_version_uuid'] == obj['uuid'])
1116             # end with llfuse.lock_released, re-acquire lock
1117
1118             self.merge(contents,
1119                        self.namefn,
1120                        samefn,
1121                        self.createDirectory)
1122             return True
1123         finally:
1124             self._updating_lock.release()
1125
1126     def _add_entry(self, i, name):
1127         ent = self.createDirectory(i)
1128         self._entries[name] = self.inodes.add_entry(ent)
1129         return self._entries[name]
1130
1131     @use_counter
1132     @check_update
1133     def __getitem__(self, k):
1134         if k == '.arvados#project':
1135             return self.project_object_file
1136         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1137             return super(ProjectDirectory, self).__getitem__(k)
1138         with llfuse.lock_released:
1139             k2 = self.unsanitize_filename(k)
1140             if k2 == k:
1141                 namefilter = ["name", "=", k]
1142             else:
1143                 namefilter = ["name", "in", [k, k2]]
1144             contents = self.api.groups().list(
1145                 filters=[
1146                     ["owner_uuid", "=", self.project_uuid],
1147                     ["group_class", "in", ["project","filter"]],
1148                     namefilter,
1149                     *self._filters_for('groups', qualified=False),
1150                 ],
1151                 limit=2,
1152             ).execute(num_retries=self.num_retries)["items"]
1153             if not contents:
1154                 contents = self.api.collections().list(
1155                     filters=[
1156                         ["owner_uuid", "=", self.project_uuid],
1157                         namefilter,
1158                         *self._filters_for('collections', qualified=False),
1159                     ],
1160                     limit=2,
1161                 ).execute(num_retries=self.num_retries)["items"]
1162         if contents:
1163             if len(contents) > 1 and contents[1]['name'] == k:
1164                 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1165                 # "foo[SUBST]bar".
1166                 contents = [contents[1]]
1167             name = self.sanitize_filename(self.namefn(contents[0]))
1168             if name != k:
1169                 raise KeyError(k)
1170             return self._add_entry(contents[0], name)
1171
1172         # Didn't find item
1173         raise KeyError(k)
1174
1175     def __contains__(self, k):
1176         if k == '.arvados#project':
1177             return True
1178         try:
1179             self[k]
1180             return True
1181         except KeyError:
1182             pass
1183         return False
1184
1185     @use_counter
1186     @check_update
1187     def writable(self):
1188         if not self._enable_write:
1189             return False
1190         with llfuse.lock_released:
1191             if not self._current_user:
1192                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1193             return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1194
1195     def persisted(self):
1196         return True
1197
1198     @use_counter
1199     @check_update
1200     def mkdir(self, name):
1201         if not self.writable():
1202             raise llfuse.FUSEError(errno.EROFS)
1203
1204         try:
1205             with llfuse.lock_released:
1206                 c = {
1207                     "owner_uuid": self.project_uuid,
1208                     "name": name,
1209                     "manifest_text": "" }
1210                 if self.storage_classes is not None:
1211                     c["storage_classes_desired"] = self.storage_classes
1212                 try:
1213                     self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1214                 except Exception as e:
1215                     raise
1216             self.invalidate()
1217         except apiclient_errors.Error as error:
1218             _logger.error(error)
1219             raise llfuse.FUSEError(errno.EEXIST)
1220
1221     @use_counter
1222     @check_update
1223     def rmdir(self, name):
1224         if not self.writable():
1225             raise llfuse.FUSEError(errno.EROFS)
1226
1227         if name not in self:
1228             raise llfuse.FUSEError(errno.ENOENT)
1229         if not isinstance(self[name], CollectionDirectory):
1230             raise llfuse.FUSEError(errno.EPERM)
1231         if len(self[name]) > 0:
1232             raise llfuse.FUSEError(errno.ENOTEMPTY)
1233         with llfuse.lock_released:
1234             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1235         self.invalidate()
1236
1237     @use_counter
1238     @check_update
1239     def rename(self, name_old, name_new, src):
1240         if not self.writable():
1241             raise llfuse.FUSEError(errno.EROFS)
1242
1243         if not isinstance(src, ProjectDirectory):
1244             raise llfuse.FUSEError(errno.EPERM)
1245
1246         ent = src[name_old]
1247
1248         if not isinstance(ent, CollectionDirectory):
1249             raise llfuse.FUSEError(errno.EPERM)
1250
1251         if name_new in self:
1252             # POSIX semantics for replacing one directory with another is
1253             # tricky (the target directory must be empty, the operation must be
1254             # atomic which isn't possible with the Arvados API as of this
1255             # writing) so don't support that.
1256             raise llfuse.FUSEError(errno.EPERM)
1257
1258         self.api.collections().update(uuid=ent.uuid(),
1259                                       body={"owner_uuid": self.uuid(),
1260                                             "name": name_new}).execute(num_retries=self.num_retries)
1261
1262         # Acually move the entry from source directory to this directory.
1263         del src._entries[name_old]
1264         self._entries[name_new] = ent
1265         self.inodes.invalidate_entry(src, name_old)
1266
1267     @use_counter
1268     def child_event(self, ev):
1269         properties = ev.get("properties") or {}
1270         old_attrs = properties.get("old_attributes") or {}
1271         new_attrs = properties.get("new_attributes") or {}
1272         old_attrs["uuid"] = ev["object_uuid"]
1273         new_attrs["uuid"] = ev["object_uuid"]
1274         old_name = self.sanitize_filename(self.namefn(old_attrs))
1275         new_name = self.sanitize_filename(self.namefn(new_attrs))
1276
1277         # create events will have a new name, but not an old name
1278         # delete events will have an old name, but not a new name
1279         # update events will have an old and new name, and they may be same or different
1280         # if they are the same, an unrelated field changed and there is nothing to do.
1281
1282         if old_attrs.get("owner_uuid") != self.project_uuid:
1283             # Was moved from somewhere else, so don't try to remove entry.
1284             old_name = None
1285         if ev.get("object_owner_uuid") != self.project_uuid:
1286             # Was moved to somewhere else, so don't try to add entry
1287             new_name = None
1288
1289         if old_attrs.get("is_trashed"):
1290             # Was previously deleted
1291             old_name = None
1292         if new_attrs.get("is_trashed"):
1293             # Has been deleted
1294             new_name = None
1295
1296         if new_name != old_name:
1297             ent = None
1298             if old_name in self._entries:
1299                 ent = self._entries[old_name]
1300                 del self._entries[old_name]
1301                 self.inodes.invalidate_entry(self, old_name)
1302
1303             if new_name:
1304                 if ent is not None:
1305                     self._entries[new_name] = ent
1306                 else:
1307                     self._add_entry(new_attrs, new_name)
1308             elif ent is not None:
1309                 self.inodes.del_entry(ent)
1310
1311
1312 class SharedDirectory(Directory):
1313     """A special directory that represents users or groups who have shared projects with me."""
1314
1315     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1316                  exclude, poll=False, poll_time=60, storage_classes=None):
1317         super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1318         self.api = api
1319         self.num_retries = num_retries
1320         self.current_user = api.users().current().execute(num_retries=num_retries)
1321         self._poll = True
1322         self._poll_time = poll_time
1323         self._updating_lock = threading.Lock()
1324         self.storage_classes = storage_classes
1325
1326     @use_counter
1327     def update(self):
1328         try:
1329             with llfuse.lock_released:
1330                 self._updating_lock.acquire()
1331                 if not self.stale():
1332                     return
1333
1334                 contents = {}
1335                 roots = []
1336                 root_owners = set()
1337                 objects = {}
1338
1339                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1340                 if 'httpMethod' in methods.get('shared', {}):
1341                     page = []
1342                     while True:
1343                         resp = self.api.groups().shared(
1344                             filters=[
1345                                 ['group_class', 'in', ['project','filter']],
1346                                 *page,
1347                                 *self._filters_for('groups', qualified=False),
1348                             ],
1349                             order="uuid",
1350                             limit=10000,
1351                             count="none",
1352                             include="owner_uuid",
1353                         ).execute()
1354                         if not resp["items"]:
1355                             break
1356                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1357                         for r in resp["items"]:
1358                             objects[r["uuid"]] = r
1359                             roots.append(r["uuid"])
1360                         for r in resp["included"]:
1361                             objects[r["uuid"]] = r
1362                             root_owners.add(r["uuid"])
1363                 else:
1364                     all_projects = list(arvados.util.keyset_list_all(
1365                         self.api.groups().list,
1366                         order_key="uuid",
1367                         num_retries=self.num_retries,
1368                         filters=[
1369                             ['group_class', 'in', ['project','filter']],
1370                             *self._filters_for('groups', qualified=False),
1371                         ],
1372                         select=["uuid", "owner_uuid"],
1373                     ))
1374                     for ob in all_projects:
1375                         objects[ob['uuid']] = ob
1376
1377                     current_uuid = self.current_user['uuid']
1378                     for ob in all_projects:
1379                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1380                             roots.append(ob['uuid'])
1381                             root_owners.add(ob['owner_uuid'])
1382
1383                     lusers = arvados.util.keyset_list_all(
1384                         self.api.users().list,
1385                         order_key="uuid",
1386                         num_retries=self.num_retries,
1387                         filters=[
1388                             ['uuid', 'in', list(root_owners)],
1389                             *self._filters_for('users', qualified=False),
1390                         ],
1391                     )
1392                     lgroups = arvados.util.keyset_list_all(
1393                         self.api.groups().list,
1394                         order_key="uuid",
1395                         num_retries=self.num_retries,
1396                         filters=[
1397                             ['uuid', 'in', list(root_owners)+roots],
1398                             *self._filters_for('groups', qualified=False),
1399                         ],
1400                     )
1401                     for l in lusers:
1402                         objects[l["uuid"]] = l
1403                     for l in lgroups:
1404                         objects[l["uuid"]] = l
1405
1406                 for r in root_owners:
1407                     if r in objects:
1408                         obr = objects[r]
1409                         if obr.get("name"):
1410                             contents[obr["name"]] = obr
1411                         elif "first_name" in obr:
1412                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1413
1414                 for r in roots:
1415                     if r in objects:
1416                         obr = objects[r]
1417                         if obr['owner_uuid'] not in objects:
1418                             contents[obr["name"]] = obr
1419
1420             # end with llfuse.lock_released, re-acquire lock
1421
1422             self.merge(
1423                 contents.items(),
1424                 lambda i: i[0],
1425                 lambda a, i: a.uuid() == i[1]['uuid'],
1426                 lambda i: ProjectDirectory(
1427                     self.inode,
1428                     self.inodes,
1429                     self.api,
1430                     self.num_retries,
1431                     self._enable_write,
1432                     self._filters,
1433                     i[1],
1434                     poll=self._poll,
1435                     poll_time=self._poll_time,
1436                     storage_classes=self.storage_classes,
1437                 ),
1438             )
1439         except Exception:
1440             _logger.exception("arv-mount shared dir error")
1441         finally:
1442             self._updating_lock.release()
1443
1444     def want_event_subscribe(self):
1445         return True