00b9d8b547dc4c03131fc98944528051496a75c6
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import apiclient
6 import arvados
7 import errno
8 import functools
9 import llfuse
10 import logging
11 import re
12 import sys
13 import threading
14 import time
15 from apiclient import errors as apiclient_errors
16
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
19
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
22
23 _logger = logging.getLogger('arvados.arvados_fuse')
24
25
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile(r'[\x00/]')
30
31
32 class Directory(FreshBase):
33     """Generic directory object, backed by a dict.
34
35     Consists of a set of entries with the key representing the filename
36     and the value referencing a File or Directory object.
37     """
38
39     def __init__(self, parent_inode, inodes, apiconfig, enable_write, filters):
40         """parent_inode is the integer inode number"""
41
42         super(Directory, self).__init__()
43
44         self.inode = None
45         if not isinstance(parent_inode, int):
46             raise Exception("parent_inode should be an int")
47         self.parent_inode = parent_inode
48         self.inodes = inodes
49         self.apiconfig = apiconfig
50         self._entries = {}
51         self._mtime = time.time()
52         self._enable_write = enable_write
53         self._filters = filters or []
54
55     def _filters_for(self, subtype, *, qualified):
56         for f in self._filters:
57             f_type, _, f_name = f[0].partition('.')
58             if not f_name:
59                 yield f
60             elif f_type != subtype:
61                 pass
62             elif qualified:
63                 yield f
64             else:
65                 yield [f_name, *f[1:]]
66
67     def forward_slash_subst(self):
68         if not hasattr(self, '_fsns'):
69             self._fsns = None
70             config = self.apiconfig()
71             try:
72                 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
73             except KeyError:
74                 # old API server with no FSNS config
75                 self._fsns = '_'
76             else:
77                 if self._fsns == '' or self._fsns == '/':
78                     self._fsns = None
79         return self._fsns
80
81     def unsanitize_filename(self, incoming):
82         """Replace ForwardSlashNameSubstitution value with /"""
83         fsns = self.forward_slash_subst()
84         if isinstance(fsns, str):
85             return incoming.replace(fsns, '/')
86         else:
87             return incoming
88
89     def sanitize_filename(self, dirty):
90         """Replace disallowed filename characters according to
91         ForwardSlashNameSubstitution in self.api_config."""
92         # '.' and '..' are not reachable if API server is newer than #6277
93         if dirty is None:
94             return None
95         elif dirty == '':
96             return '_'
97         elif dirty == '.':
98             return '_'
99         elif dirty == '..':
100             return '__'
101         else:
102             fsns = self.forward_slash_subst()
103             if isinstance(fsns, str):
104                 dirty = dirty.replace('/', fsns)
105             return _disallowed_filename_characters.sub('_', dirty)
106
107
108     #  Overridden by subclasses to implement logic to update the
109     #  entries dict when the directory is stale
110     @use_counter
111     def update(self):
112         pass
113
114     # Only used when computing the size of the disk footprint of the directory
115     # (stub)
116     def size(self):
117         return 0
118
119     def persisted(self):
120         return False
121
122     def checkupdate(self):
123         if self.stale():
124             try:
125                 self.update()
126             except apiclient.errors.HttpError as e:
127                 _logger.warn(e)
128
129     @use_counter
130     @check_update
131     def __getitem__(self, item):
132         return self._entries[item]
133
134     @use_counter
135     @check_update
136     def items(self):
137         return list(self._entries.items())
138
139     @use_counter
140     @check_update
141     def __contains__(self, k):
142         return k in self._entries
143
144     @use_counter
145     @check_update
146     def __len__(self):
147         return len(self._entries)
148
149     def fresh(self):
150         self.inodes.touch(self)
151         super(Directory, self).fresh()
152
153     def merge(self, items, fn, same, new_entry):
154         """Helper method for updating the contents of the directory.
155
156         Takes a list describing the new contents of the directory, reuse
157         entries that are the same in both the old and new lists, create new
158         entries, and delete old entries missing from the new list.
159
160         :items: iterable with new directory contents
161
162         :fn: function to take an entry in 'items' and return the desired file or
163         directory name, or None if this entry should be skipped
164
165         :same: function to compare an existing entry (a File or Directory
166         object) with an entry in the items list to determine whether to keep
167         the existing entry.
168
169         :new_entry: function to create a new directory entry (File or Directory
170         object) from an entry in the items list.
171
172         """
173
174         oldentries = self._entries
175         self._entries = {}
176         changed = False
177         for i in items:
178             name = self.sanitize_filename(fn(i))
179             if not name:
180                 continue
181             if name in oldentries:
182                 ent = oldentries[name]
183                 ent.inc_use()
184                 if same(ent, i):
185                     # move existing directory entry over
186                     self._entries[name] = ent
187                     del oldentries[name]
188
189         for i in items:
190             name = self.sanitize_filename(fn(i))
191             if not name:
192                 continue
193             if name not in self._entries:
194                 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
195                 # create new directory entry
196                 ent = new_entry(i)
197                 if ent is not None:
198                     ent.inc_use()
199                     self._entries[name] = self.inodes.add_entry(ent)
200                     changed = True
201
202         # delete any other directory entries that were not in found in 'items'
203         for i in oldentries:
204             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
205             self.inodes.invalidate_entry(self, i)
206             self.inodes.del_entry(oldentries[i])
207             ent.dec_use()
208             changed = True
209
210         if changed:
211             self.inodes.invalidate_inode(self)
212             self._mtime = time.time()
213
214         self.inodes.inode_cache.cap_cache()
215
216         for ent in self._entries.values():
217            ent.dec_use()
218
219         self.fresh()
220
221     def in_use(self):
222         if super(Directory, self).in_use():
223             return True
224         for v in self._entries.values():
225             if v.in_use():
226                 return True
227         return False
228
229     def has_ref(self, only_children):
230         if super(Directory, self).has_ref(only_children):
231             return True
232         for v in self._entries.values():
233             if v.has_ref(False):
234                 return True
235         return False
236
237     def clear(self):
238         """Delete all entries"""
239         oldentries = self._entries
240         self._entries = {}
241         for n in oldentries:
242             oldentries[n].clear()
243             self.inodes.del_entry(oldentries[n])
244         self.invalidate()
245
246     def kernel_invalidate(self):
247         # Invalidating the dentry on the parent implies invalidating all paths
248         # below it as well.
249         parent = self.inodes[self.parent_inode]
250
251         # Find self on the parent in order to invalidate this path.
252         # Calling the public items() method might trigger a refresh,
253         # which we definitely don't want, so read the internal dict directly.
254         for k,v in parent._entries.items():
255             if v is self:
256                 self.inodes.invalidate_entry(parent, k)
257                 break
258
259     def mtime(self):
260         return self._mtime
261
262     def writable(self):
263         return False
264
265     def flush(self):
266         pass
267
268     def want_event_subscribe(self):
269         raise NotImplementedError()
270
271     def create(self, name):
272         raise NotImplementedError()
273
274     def mkdir(self, name):
275         raise NotImplementedError()
276
277     def unlink(self, name):
278         raise NotImplementedError()
279
280     def rmdir(self, name):
281         raise NotImplementedError()
282
283     def rename(self, name_old, name_new, src):
284         raise NotImplementedError()
285
286
287 class CollectionDirectoryBase(Directory):
288     """Represent an Arvados Collection as a directory.
289
290     This class is used for Subcollections, and is also the base class for
291     CollectionDirectory, which implements collection loading/saving on
292     Collection records.
293
294     Most operations act only the underlying Arvados `Collection` object.  The
295     `Collection` object signals via a notify callback to
296     `CollectionDirectoryBase.on_event` that an item was added, removed or
297     modified.  FUSE inodes and directory entries are created, deleted or
298     invalidated in response to these events.
299
300     """
301
302     def __init__(self, parent_inode, inodes, apiconfig, enable_write, filters, collection, collection_root):
303         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write, filters)
304         self.apiconfig = apiconfig
305         self.collection = collection
306         self.collection_root = collection_root
307         self.collection_record_file = None
308
309     def new_entry(self, name, item, mtime):
310         name = self.sanitize_filename(name)
311         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
312             if item.fuse_entry.dead is not True:
313                 raise Exception("Can only reparent dead inode entry")
314             if item.fuse_entry.inode is None:
315                 raise Exception("Reparented entry must still have valid inode")
316             item.fuse_entry.dead = False
317             self._entries[name] = item.fuse_entry
318         elif isinstance(item, arvados.collection.RichCollectionBase):
319             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
320                 self.inode,
321                 self.inodes,
322                 self.apiconfig,
323                 self._enable_write,
324                 self._filters,
325                 item,
326                 self.collection_root,
327             ))
328             self._entries[name].populate(mtime)
329         else:
330             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
331         item.fuse_entry = self._entries[name]
332
333     def on_event(self, event, collection, name, item):
334         # These are events from the Collection object (ADD/DEL/MOD)
335         # emitted by operations on the Collection object (like
336         # "mkdirs" or "remove"), and by "update", which we need to
337         # synchronize with our FUSE objects that are assigned inodes.
338         if collection == self.collection:
339             name = self.sanitize_filename(name)
340
341             #
342             # It's possible for another thread to have llfuse.lock and
343             # be waiting on collection.lock.  Meanwhile, we released
344             # llfuse.lock earlier in the stack, but are still holding
345             # on to the collection lock, and now we need to re-acquire
346             # llfuse.lock.  If we don't release the collection lock,
347             # we'll deadlock where we're holding the collection lock
348             # waiting for llfuse.lock and the other thread is holding
349             # llfuse.lock and waiting for the collection lock.
350             #
351             # The correct locking order here is to take llfuse.lock
352             # first, then the collection lock.
353             #
354             # Since collection.lock is an RLock, it might be locked
355             # multiple times, so we need to release it multiple times,
356             # keep a count, then re-lock it the correct number of
357             # times.
358             #
359             lockcount = 0
360             try:
361                 while True:
362                     self.collection.lock.release()
363                     lockcount += 1
364             except RuntimeError:
365                 pass
366
367             try:
368                 with llfuse.lock:
369                     with self.collection.lock:
370                         if event == arvados.collection.ADD:
371                             self.new_entry(name, item, self.mtime())
372                         elif event == arvados.collection.DEL:
373                             ent = self._entries[name]
374                             del self._entries[name]
375                             self.inodes.invalidate_entry(self, name)
376                             self.inodes.del_entry(ent)
377                         elif event == arvados.collection.MOD:
378                             if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
379                                 self.inodes.invalidate_inode(item.fuse_entry)
380                             elif name in self._entries:
381                                 self.inodes.invalidate_inode(self._entries[name])
382
383                         if self.collection_record_file is not None:
384                             self.collection_record_file.invalidate()
385                             self.inodes.invalidate_inode(self.collection_record_file)
386             finally:
387                 while lockcount > 0:
388                     self.collection.lock.acquire()
389                     lockcount -= 1
390
391     def populate(self, mtime):
392         self._mtime = mtime
393         with self.collection.lock:
394             self.collection.subscribe(self.on_event)
395             for entry, item in self.collection.items():
396                 self.new_entry(entry, item, self.mtime())
397
398     def writable(self):
399         return self._enable_write and self.collection.writable()
400
401     @use_counter
402     def flush(self):
403         self.collection_root.flush()
404
405     @use_counter
406     @check_update
407     def create(self, name):
408         if not self.writable():
409             raise llfuse.FUSEError(errno.EROFS)
410         with llfuse.lock_released:
411             self.collection.open(name, "w").close()
412
413     @use_counter
414     @check_update
415     def mkdir(self, name):
416         if not self.writable():
417             raise llfuse.FUSEError(errno.EROFS)
418         with llfuse.lock_released:
419             self.collection.mkdirs(name)
420
421     @use_counter
422     @check_update
423     def unlink(self, name):
424         if not self.writable():
425             raise llfuse.FUSEError(errno.EROFS)
426         with llfuse.lock_released:
427             self.collection.remove(name)
428         self.flush()
429
430     @use_counter
431     @check_update
432     def rmdir(self, name):
433         if not self.writable():
434             raise llfuse.FUSEError(errno.EROFS)
435         with llfuse.lock_released:
436             self.collection.remove(name)
437         self.flush()
438
439     @use_counter
440     @check_update
441     def rename(self, name_old, name_new, src):
442         if not self.writable():
443             raise llfuse.FUSEError(errno.EROFS)
444
445         if not isinstance(src, CollectionDirectoryBase):
446             raise llfuse.FUSEError(errno.EPERM)
447
448         if name_new in self:
449             ent = src[name_old]
450             tgt = self[name_new]
451             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
452                 pass
453             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
454                 if len(tgt) > 0:
455                     raise llfuse.FUSEError(errno.ENOTEMPTY)
456             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
457                 raise llfuse.FUSEError(errno.ENOTDIR)
458             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
459                 raise llfuse.FUSEError(errno.EISDIR)
460
461         with llfuse.lock_released:
462             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
463         self.flush()
464         src.flush()
465
466     def clear(self):
467         super(CollectionDirectoryBase, self).clear()
468         self.collection = None
469
470
471 class CollectionDirectory(CollectionDirectoryBase):
472     """Represents the root of a directory tree representing a collection."""
473
474     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
475         super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters, None, self)
476         self.api = api
477         self.num_retries = num_retries
478         self._poll = True
479         try:
480             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
481         except:
482             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
483             self._poll_time = 60*60
484
485         if isinstance(collection_record, dict):
486             self.collection_locator = collection_record['uuid']
487             self._mtime = convertTime(collection_record.get('modified_at'))
488         else:
489             self.collection_locator = collection_record
490             self._mtime = 0
491         self._manifest_size = 0
492         if self.collection_locator:
493             self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
494         self._updating_lock = threading.Lock()
495
496     def same(self, i):
497         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
498
499     def writable(self):
500         return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
501
502     @use_counter
503     def flush(self):
504         if not self.writable():
505             return
506         with llfuse.lock_released:
507             with self._updating_lock:
508                 if self.collection.committed():
509                     self.collection.update()
510                 else:
511                     self.collection.save()
512                 self.new_collection_record(self.collection.api_response())
513
514     def want_event_subscribe(self):
515         return (uuid_pattern.match(self.collection_locator) is not None)
516
517     def new_collection(self, new_collection_record, coll_reader):
518         if self.inode:
519             self.clear()
520         self.collection = coll_reader
521         self.new_collection_record(new_collection_record)
522         self.populate(self.mtime())
523
524     def new_collection_record(self, new_collection_record):
525         if not new_collection_record:
526             raise Exception("invalid new_collection_record")
527         self._mtime = convertTime(new_collection_record.get('modified_at'))
528         self._manifest_size = len(new_collection_record["manifest_text"])
529         self.collection_locator = new_collection_record["uuid"]
530         if self.collection_record_file is not None:
531             self.collection_record_file.invalidate()
532             self.inodes.invalidate_inode(self.collection_record_file)
533             _logger.debug("%s invalidated collection record file", self)
534         self.fresh()
535
536     def uuid(self):
537         return self.collection_locator
538
539     @use_counter
540     def update(self):
541         try:
542             if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator):
543                 # It's immutable, nothing to update
544                 return True
545
546             if self.collection_locator is None:
547                 # No collection locator to retrieve from
548                 self.fresh()
549                 return True
550
551             new_collection_record = None
552             try:
553                 with llfuse.lock_released:
554                     self._updating_lock.acquire()
555                     if not self.stale():
556                         return True
557
558                     _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
559                     coll_reader = None
560                     if self.collection is not None:
561                         # Already have a collection object
562                         self.collection.update()
563                         new_collection_record = self.collection.api_response()
564                     else:
565                         # Create a new collection object
566                         if uuid_pattern.match(self.collection_locator):
567                             coll_reader = arvados.collection.Collection(
568                                 self.collection_locator, self.api, self.api.keep,
569                                 num_retries=self.num_retries)
570                         else:
571                             coll_reader = arvados.collection.CollectionReader(
572                                 self.collection_locator, self.api, self.api.keep,
573                                 num_retries=self.num_retries)
574                         new_collection_record = coll_reader.api_response() or {}
575                         # If the Collection only exists in Keep, there will be no API
576                         # response.  Fill in the fields we need.
577                         if 'uuid' not in new_collection_record:
578                             new_collection_record['uuid'] = self.collection_locator
579                         if "portable_data_hash" not in new_collection_record:
580                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
581                         if 'manifest_text' not in new_collection_record:
582                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
583                         if 'storage_classes_desired' not in new_collection_record:
584                             new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
585
586                 # end with llfuse.lock_released, re-acquire lock
587
588                 if new_collection_record is not None:
589                     if coll_reader is not None:
590                         self.new_collection(new_collection_record, coll_reader)
591                     else:
592                         self.new_collection_record(new_collection_record)
593
594                 return True
595             finally:
596                 self._updating_lock.release()
597         except arvados.errors.NotFoundError as e:
598             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
599         except arvados.errors.ArgumentError as detail:
600             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
601             if new_collection_record is not None and "manifest_text" in new_collection_record:
602                 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
603         except Exception:
604             _logger.exception("arv-mount %s: error", self.collection_locator)
605             if new_collection_record is not None and "manifest_text" in new_collection_record:
606                 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
607         self.invalidate()
608         return False
609
610     @use_counter
611     def collection_record(self):
612         self.flush()
613         return self.collection.api_response()
614
615     @use_counter
616     @check_update
617     def __getitem__(self, item):
618         if item == '.arvados#collection':
619             if self.collection_record_file is None:
620                 self.collection_record_file = FuncToJSONFile(
621                     self.inode, self.collection_record)
622                 self.inodes.add_entry(self.collection_record_file)
623             self.invalidate()  # use lookup as a signal to force update
624             return self.collection_record_file
625         else:
626             return super(CollectionDirectory, self).__getitem__(item)
627
628     def __contains__(self, k):
629         if k == '.arvados#collection':
630             return True
631         else:
632             return super(CollectionDirectory, self).__contains__(k)
633
634     def invalidate(self):
635         if self.collection_record_file is not None:
636             self.collection_record_file.invalidate()
637             self.inodes.invalidate_inode(self.collection_record_file)
638         super(CollectionDirectory, self).invalidate()
639
640     def persisted(self):
641         return (self.collection_locator is not None)
642
643     def objsize(self):
644         # This is an empirically-derived heuristic to estimate the memory used
645         # to store this collection's metadata.  Calculating the memory
646         # footprint directly would be more accurate, but also more complicated.
647         return self._manifest_size * 128
648
649     def finalize(self):
650         if self.collection is not None:
651             if self.writable():
652                 self.collection.save()
653             self.collection.stop_threads()
654
655     def clear(self):
656         if self.collection is not None:
657             self.collection.stop_threads()
658         super(CollectionDirectory, self).clear()
659         self._manifest_size = 0
660
661
662 class TmpCollectionDirectory(CollectionDirectoryBase):
663     """A directory backed by an Arvados collection that never gets saved.
664
665     This supports using Keep as scratch space. A userspace program can
666     read the .arvados#collection file to get a current manifest in
667     order to save a snapshot of the scratch data or use it as a crunch
668     job output.
669     """
670
671     class UnsaveableCollection(arvados.collection.Collection):
672         def save(self):
673             pass
674         def save_new(self):
675             pass
676
677     def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None):
678         collection = self.UnsaveableCollection(
679             api_client=api_client,
680             keep_client=api_client.keep,
681             num_retries=num_retries,
682             storage_classes_desired=storage_classes)
683         # This is always enable_write=True because it never tries to
684         # save to the backend
685         super(TmpCollectionDirectory, self).__init__(
686             parent_inode, inodes, api_client.config, True, filters, collection, self)
687         self.populate(self.mtime())
688
689     def on_event(self, *args, **kwargs):
690         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
691         if self.collection_record_file is None:
692             return
693
694         # See discussion in CollectionDirectoryBase.on_event
695         lockcount = 0
696         try:
697             while True:
698                 self.collection.lock.release()
699                 lockcount += 1
700         except RuntimeError:
701             pass
702
703         try:
704             with llfuse.lock:
705                 with self.collection.lock:
706                     self.collection_record_file.invalidate()
707                     self.inodes.invalidate_inode(self.collection_record_file)
708                     _logger.debug("%s invalidated collection record", self)
709         finally:
710             while lockcount > 0:
711                 self.collection.lock.acquire()
712                 lockcount -= 1
713
714     def collection_record(self):
715         with llfuse.lock_released:
716             return {
717                 "uuid": None,
718                 "manifest_text": self.collection.manifest_text(),
719                 "portable_data_hash": self.collection.portable_data_hash(),
720                 "storage_classes_desired": self.collection.storage_classes_desired(),
721             }
722
723     def __contains__(self, k):
724         return (k == '.arvados#collection' or
725                 super(TmpCollectionDirectory, self).__contains__(k))
726
727     @use_counter
728     def __getitem__(self, item):
729         if item == '.arvados#collection':
730             if self.collection_record_file is None:
731                 self.collection_record_file = FuncToJSONFile(
732                     self.inode, self.collection_record)
733                 self.inodes.add_entry(self.collection_record_file)
734             return self.collection_record_file
735         return super(TmpCollectionDirectory, self).__getitem__(item)
736
737     def persisted(self):
738         return False
739
740     def writable(self):
741         return True
742
743     def flush(self):
744         pass
745
746     def want_event_subscribe(self):
747         return False
748
749     def finalize(self):
750         self.collection.stop_threads()
751
752     def invalidate(self):
753         if self.collection_record_file:
754             self.collection_record_file.invalidate()
755         super(TmpCollectionDirectory, self).invalidate()
756
757
758 class MagicDirectory(Directory):
759     """A special directory that logically contains the set of all extant keep locators.
760
761     When a file is referenced by lookup(), it is tested to see if it is a valid
762     keep locator to a manifest, and if so, loads the manifest contents as a
763     subdirectory of this directory with the locator as the directory name.
764     Since querying a list of all extant keep locators is impractical, only
765     collections that have already been accessed are visible to readdir().
766
767     """
768
769     README_TEXT = """
770 This directory provides access to Arvados collections as subdirectories listed
771 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
772 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
773 (in the form 'zzzzz-j7d0g-1234567890abcde').
774
775 Note that this directory will appear empty until you attempt to access a
776 specific collection or project subdirectory (such as trying to 'cd' into it),
777 at which point the collection or project will actually be looked up on the server
778 and the directory will appear if it exists.
779
780 """.lstrip()
781
782     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
783         super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
784         self.api = api
785         self.num_retries = num_retries
786         self.pdh_only = pdh_only
787         self.storage_classes = storage_classes
788
789     def __setattr__(self, name, value):
790         super(MagicDirectory, self).__setattr__(name, value)
791         # When we're assigned an inode, add a README.
792         if ((name == 'inode') and (self.inode is not None) and
793               (not self._entries)):
794             self._entries['README'] = self.inodes.add_entry(
795                 StringFile(self.inode, self.README_TEXT, time.time()))
796             # If we're the root directory, add an identical by_id subdirectory.
797             if self.inode == llfuse.ROOT_INODE:
798                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
799                     self.inode,
800                     self.inodes,
801                     self.api,
802                     self.num_retries,
803                     self._enable_write,
804                     self._filters,
805                     self.pdh_only,
806                 ))
807
808     def __contains__(self, k):
809         if k in self._entries:
810             return True
811
812         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
813             return False
814
815         try:
816             e = None
817
818             if group_uuid_pattern.match(k):
819                 project = self.api.groups().list(
820                     filters=[
821                         ['group_class', 'in', ['project','filter']],
822                         ["uuid", "=", k],
823                         *self._filters_for('groups', qualified=False),
824                     ],
825                 ).execute(num_retries=self.num_retries)
826                 if project[u'items_available'] == 0:
827                     return False
828                 e = self.inodes.add_entry(ProjectDirectory(
829                     self.inode,
830                     self.inodes,
831                     self.api,
832                     self.num_retries,
833                     self._enable_write,
834                     self._filters,
835                     project[u'items'][0],
836                     storage_classes=self.storage_classes,
837                 ))
838             else:
839                 e = self.inodes.add_entry(CollectionDirectory(
840                     self.inode,
841                     self.inodes,
842                     self.api,
843                     self.num_retries,
844                     self._enable_write,
845                     self._filters,
846                     k,
847                 ))
848
849             if e.update():
850                 if k not in self._entries:
851                     self._entries[k] = e
852                 else:
853                     self.inodes.del_entry(e)
854                 return True
855             else:
856                 self.inodes.invalidate_entry(self, k)
857                 self.inodes.del_entry(e)
858                 return False
859         except Exception as ex:
860             _logger.exception("arv-mount lookup '%s':", k)
861             if e is not None:
862                 self.inodes.del_entry(e)
863             return False
864
865     def __getitem__(self, item):
866         if item in self:
867             return self._entries[item]
868         else:
869             raise KeyError("No collection with id " + item)
870
871     def clear(self):
872         pass
873
874     def want_event_subscribe(self):
875         return not self.pdh_only
876
877
878 class TagsDirectory(Directory):
879     """A special directory that contains as subdirectories all tags visible to the user."""
880
881     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
882         super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
883         self.api = api
884         self.num_retries = num_retries
885         self._poll = True
886         self._poll_time = poll_time
887         self._extra = set()
888
889     def want_event_subscribe(self):
890         return True
891
892     @use_counter
893     def update(self):
894         with llfuse.lock_released:
895             tags = self.api.links().list(
896                 filters=[
897                     ['link_class', '=', 'tag'],
898                     ['name', '!=', ''],
899                     *self._filters_for('links', qualified=False),
900                 ],
901                 select=['name'],
902                 distinct=True,
903                 limit=1000,
904             ).execute(num_retries=self.num_retries)
905         if "items" in tags:
906             self.merge(
907                 tags['items']+[{"name": n} for n in self._extra],
908                 lambda i: i['name'],
909                 lambda a, i: a.tag == i['name'],
910                 lambda i: TagDirectory(
911                     self.inode,
912                     self.inodes,
913                     self.api,
914                     self.num_retries,
915                     self._enable_write,
916                     self._filters,
917                     i['name'],
918                     poll=self._poll,
919                     poll_time=self._poll_time,
920                 ),
921             )
922
923     @use_counter
924     @check_update
925     def __getitem__(self, item):
926         if super(TagsDirectory, self).__contains__(item):
927             return super(TagsDirectory, self).__getitem__(item)
928         with llfuse.lock_released:
929             tags = self.api.links().list(
930                 filters=[
931                     ['link_class', '=', 'tag'],
932                     ['name', '=', item],
933                     *self._filters_for('links', qualified=False),
934                 ],
935                 limit=1,
936             ).execute(num_retries=self.num_retries)
937         if tags["items"]:
938             self._extra.add(item)
939             self.update()
940         return super(TagsDirectory, self).__getitem__(item)
941
942     @use_counter
943     @check_update
944     def __contains__(self, k):
945         if super(TagsDirectory, self).__contains__(k):
946             return True
947         try:
948             self[k]
949             return True
950         except KeyError:
951             pass
952         return False
953
954
955 class TagDirectory(Directory):
956     """A special directory that contains as subdirectories all collections visible
957     to the user that are tagged with a particular tag.
958     """
959
960     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
961                  poll=False, poll_time=60):
962         super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
963         self.api = api
964         self.num_retries = num_retries
965         self.tag = tag
966         self._poll = poll
967         self._poll_time = poll_time
968
969     def want_event_subscribe(self):
970         return True
971
972     @use_counter
973     def update(self):
974         with llfuse.lock_released:
975             taggedcollections = self.api.links().list(
976                 filters=[
977                     ['link_class', '=', 'tag'],
978                     ['name', '=', self.tag],
979                     ['head_uuid', 'is_a', 'arvados#collection'],
980                     *self._filters_for('links', qualified=False),
981                 ],
982                 select=['head_uuid'],
983             ).execute(num_retries=self.num_retries)
984         self.merge(
985             taggedcollections['items'],
986             lambda i: i['head_uuid'],
987             lambda a, i: a.collection_locator == i['head_uuid'],
988             lambda i: CollectionDirectory(
989                 self.inode,
990                 self.inodes,
991                 self.api,
992                 self.num_retries,
993                 self._enable_write,
994                 self._filters,
995                 i['head_uuid'],
996             ),
997         )
998
999
1000 class ProjectDirectory(Directory):
1001     """A special directory that contains the contents of a project."""
1002
1003     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1004                  project_object, poll=True, poll_time=3, storage_classes=None):
1005         super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
1006         self.api = api
1007         self.num_retries = num_retries
1008         self.project_object = project_object
1009         self.project_object_file = None
1010         self.project_uuid = project_object['uuid']
1011         self._poll = poll
1012         self._poll_time = poll_time
1013         self._updating_lock = threading.Lock()
1014         self._current_user = None
1015         self._full_listing = False
1016         self.storage_classes = storage_classes
1017
1018     def want_event_subscribe(self):
1019         return True
1020
1021     def createDirectory(self, i):
1022         common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters)
1023         if collection_uuid_pattern.match(i['uuid']):
1024             return CollectionDirectory(*common_args, i)
1025         elif group_uuid_pattern.match(i['uuid']):
1026             return ProjectDirectory(*common_args, i, self._poll, self._poll_time, self.storage_classes)
1027         elif link_uuid_pattern.match(i['uuid']):
1028             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
1029                 return CollectionDirectory(*common_args, i['head_uuid'])
1030             else:
1031                 return None
1032         elif uuid_pattern.match(i['uuid']):
1033             return ObjectFile(self.parent_inode, i)
1034         else:
1035             return None
1036
1037     def uuid(self):
1038         return self.project_uuid
1039
1040     def items(self):
1041         self._full_listing = True
1042         return super(ProjectDirectory, self).items()
1043
1044     def namefn(self, i):
1045         if 'name' in i:
1046             if i['name'] is None or len(i['name']) == 0:
1047                 return None
1048             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
1049                 # collection or subproject
1050                 return i['name']
1051             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
1052                 # name link
1053                 return i['name']
1054             elif 'kind' in i and i['kind'].startswith('arvados#'):
1055                 # something else
1056                 return "{}.{}".format(i['name'], i['kind'][8:])
1057         else:
1058             return None
1059
1060
1061     @use_counter
1062     def update(self):
1063         if self.project_object_file == None:
1064             self.project_object_file = ObjectFile(self.inode, self.project_object)
1065             self.inodes.add_entry(self.project_object_file)
1066
1067         if not self._full_listing:
1068             return True
1069
1070         def samefn(a, i):
1071             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
1072                 return a.uuid() == i['uuid']
1073             elif isinstance(a, ObjectFile):
1074                 return a.uuid() == i['uuid'] and not a.stale()
1075             return False
1076
1077         try:
1078             with llfuse.lock_released:
1079                 _logger.debug("Getting lock to update %s", self.project_uuid)
1080                 self._updating_lock.acquire()
1081                 if not self.stale():
1082                     _logger.debug("%s was updated already", self.project_uuid)
1083                     return
1084
1085                 _logger.debug("Requesting update of %s", self.project_uuid)
1086
1087                 if group_uuid_pattern.match(self.project_uuid):
1088                     self.project_object = self.api.groups().get(
1089                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
1090                 elif user_uuid_pattern.match(self.project_uuid):
1091                     self.project_object = self.api.users().get(
1092                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
1093                 # do this in 2 steps until #17424 is fixed
1094                 contents = list(arvados.util.keyset_list_all(
1095                     self.api.groups().contents,
1096                     order_key='uuid',
1097                     num_retries=self.num_retries,
1098                     uuid=self.project_uuid,
1099                     filters=[
1100                         ['uuid', 'is_a', 'arvados#group'],
1101                         ['groups.group_class', 'in', ['project', 'filter']],
1102                         *self._filters_for('groups', qualified=True),
1103                     ],
1104                 ))
1105                 contents.extend(obj for obj in arvados.util.keyset_list_all(
1106                     self.api.groups().contents,
1107                     order_key='uuid',
1108                     num_retries=self.num_retries,
1109                     uuid=self.project_uuid,
1110                     filters=[
1111                         ['uuid', 'is_a', 'arvados#collection'],
1112                         *self._filters_for('collections', qualified=True),
1113                     ],
1114                 ) if obj['current_version_uuid'] == obj['uuid'])
1115             # end with llfuse.lock_released, re-acquire lock
1116
1117             self.merge(contents,
1118                        self.namefn,
1119                        samefn,
1120                        self.createDirectory)
1121             return True
1122         finally:
1123             self._updating_lock.release()
1124
1125     def _add_entry(self, i, name):
1126         ent = self.createDirectory(i)
1127         self._entries[name] = self.inodes.add_entry(ent)
1128         return self._entries[name]
1129
1130     @use_counter
1131     @check_update
1132     def __getitem__(self, k):
1133         if k == '.arvados#project':
1134             return self.project_object_file
1135         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1136             return super(ProjectDirectory, self).__getitem__(k)
1137         with llfuse.lock_released:
1138             k2 = self.unsanitize_filename(k)
1139             if k2 == k:
1140                 namefilter = ["name", "=", k]
1141             else:
1142                 namefilter = ["name", "in", [k, k2]]
1143             contents = self.api.groups().list(
1144                 filters=[
1145                     ["owner_uuid", "=", self.project_uuid],
1146                     ["group_class", "in", ["project","filter"]],
1147                     namefilter,
1148                     *self._filters_for('groups', qualified=False),
1149                 ],
1150                 limit=2,
1151             ).execute(num_retries=self.num_retries)["items"]
1152             if not contents:
1153                 contents = self.api.collections().list(
1154                     filters=[
1155                         ["owner_uuid", "=", self.project_uuid],
1156                         namefilter,
1157                         *self._filters_for('collections', qualified=False),
1158                     ],
1159                     limit=2,
1160                 ).execute(num_retries=self.num_retries)["items"]
1161         if contents:
1162             if len(contents) > 1 and contents[1]['name'] == k:
1163                 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1164                 # "foo[SUBST]bar".
1165                 contents = [contents[1]]
1166             name = self.sanitize_filename(self.namefn(contents[0]))
1167             if name != k:
1168                 raise KeyError(k)
1169             return self._add_entry(contents[0], name)
1170
1171         # Didn't find item
1172         raise KeyError(k)
1173
1174     def __contains__(self, k):
1175         if k == '.arvados#project':
1176             return True
1177         try:
1178             self[k]
1179             return True
1180         except KeyError:
1181             pass
1182         return False
1183
1184     @use_counter
1185     @check_update
1186     def writable(self):
1187         if not self._enable_write:
1188             return False
1189         with llfuse.lock_released:
1190             if not self._current_user:
1191                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1192             return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1193
1194     def persisted(self):
1195         return True
1196
1197     @use_counter
1198     @check_update
1199     def mkdir(self, name):
1200         if not self.writable():
1201             raise llfuse.FUSEError(errno.EROFS)
1202
1203         try:
1204             with llfuse.lock_released:
1205                 c = {
1206                     "owner_uuid": self.project_uuid,
1207                     "name": name,
1208                     "manifest_text": "" }
1209                 if self.storage_classes is not None:
1210                     c["storage_classes_desired"] = self.storage_classes
1211                 try:
1212                     self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1213                 except Exception as e:
1214                     raise
1215             self.invalidate()
1216         except apiclient_errors.Error as error:
1217             _logger.error(error)
1218             raise llfuse.FUSEError(errno.EEXIST)
1219
1220     @use_counter
1221     @check_update
1222     def rmdir(self, name):
1223         if not self.writable():
1224             raise llfuse.FUSEError(errno.EROFS)
1225
1226         if name not in self:
1227             raise llfuse.FUSEError(errno.ENOENT)
1228         if not isinstance(self[name], CollectionDirectory):
1229             raise llfuse.FUSEError(errno.EPERM)
1230         if len(self[name]) > 0:
1231             raise llfuse.FUSEError(errno.ENOTEMPTY)
1232         with llfuse.lock_released:
1233             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1234         self.invalidate()
1235
1236     @use_counter
1237     @check_update
1238     def rename(self, name_old, name_new, src):
1239         if not self.writable():
1240             raise llfuse.FUSEError(errno.EROFS)
1241
1242         if not isinstance(src, ProjectDirectory):
1243             raise llfuse.FUSEError(errno.EPERM)
1244
1245         ent = src[name_old]
1246
1247         if not isinstance(ent, CollectionDirectory):
1248             raise llfuse.FUSEError(errno.EPERM)
1249
1250         if name_new in self:
1251             # POSIX semantics for replacing one directory with another is
1252             # tricky (the target directory must be empty, the operation must be
1253             # atomic which isn't possible with the Arvados API as of this
1254             # writing) so don't support that.
1255             raise llfuse.FUSEError(errno.EPERM)
1256
1257         self.api.collections().update(uuid=ent.uuid(),
1258                                       body={"owner_uuid": self.uuid(),
1259                                             "name": name_new}).execute(num_retries=self.num_retries)
1260
1261         # Acually move the entry from source directory to this directory.
1262         del src._entries[name_old]
1263         self._entries[name_new] = ent
1264         self.inodes.invalidate_entry(src, name_old)
1265
1266     @use_counter
1267     def child_event(self, ev):
1268         properties = ev.get("properties") or {}
1269         old_attrs = properties.get("old_attributes") or {}
1270         new_attrs = properties.get("new_attributes") or {}
1271         old_attrs["uuid"] = ev["object_uuid"]
1272         new_attrs["uuid"] = ev["object_uuid"]
1273         old_name = self.sanitize_filename(self.namefn(old_attrs))
1274         new_name = self.sanitize_filename(self.namefn(new_attrs))
1275
1276         # create events will have a new name, but not an old name
1277         # delete events will have an old name, but not a new name
1278         # update events will have an old and new name, and they may be same or different
1279         # if they are the same, an unrelated field changed and there is nothing to do.
1280
1281         if old_attrs.get("owner_uuid") != self.project_uuid:
1282             # Was moved from somewhere else, so don't try to remove entry.
1283             old_name = None
1284         if ev.get("object_owner_uuid") != self.project_uuid:
1285             # Was moved to somewhere else, so don't try to add entry
1286             new_name = None
1287
1288         if old_attrs.get("is_trashed"):
1289             # Was previously deleted
1290             old_name = None
1291         if new_attrs.get("is_trashed"):
1292             # Has been deleted
1293             new_name = None
1294
1295         if new_name != old_name:
1296             ent = None
1297             if old_name in self._entries:
1298                 ent = self._entries[old_name]
1299                 del self._entries[old_name]
1300                 self.inodes.invalidate_entry(self, old_name)
1301
1302             if new_name:
1303                 if ent is not None:
1304                     self._entries[new_name] = ent
1305                 else:
1306                     self._add_entry(new_attrs, new_name)
1307             elif ent is not None:
1308                 self.inodes.del_entry(ent)
1309
1310
1311 class SharedDirectory(Directory):
1312     """A special directory that represents users or groups who have shared projects with me."""
1313
1314     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1315                  exclude, poll=False, poll_time=60, storage_classes=None):
1316         super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
1317         self.api = api
1318         self.num_retries = num_retries
1319         self.current_user = api.users().current().execute(num_retries=num_retries)
1320         self._poll = True
1321         self._poll_time = poll_time
1322         self._updating_lock = threading.Lock()
1323         self.storage_classes = storage_classes
1324
1325     @use_counter
1326     def update(self):
1327         try:
1328             with llfuse.lock_released:
1329                 self._updating_lock.acquire()
1330                 if not self.stale():
1331                     return
1332
1333                 contents = {}
1334                 roots = []
1335                 root_owners = set()
1336                 objects = {}
1337
1338                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1339                 if 'httpMethod' in methods.get('shared', {}):
1340                     page = []
1341                     while True:
1342                         resp = self.api.groups().shared(
1343                             filters=[
1344                                 ['group_class', 'in', ['project','filter']],
1345                                 *page,
1346                                 *self._filters_for('groups', qualified=False),
1347                             ],
1348                             order="uuid",
1349                             limit=10000,
1350                             count="none",
1351                             include="owner_uuid",
1352                         ).execute()
1353                         if not resp["items"]:
1354                             break
1355                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1356                         for r in resp["items"]:
1357                             objects[r["uuid"]] = r
1358                             roots.append(r["uuid"])
1359                         for r in resp["included"]:
1360                             objects[r["uuid"]] = r
1361                             root_owners.add(r["uuid"])
1362                 else:
1363                     all_projects = list(arvados.util.keyset_list_all(
1364                         self.api.groups().list,
1365                         order_key="uuid",
1366                         num_retries=self.num_retries,
1367                         filters=[
1368                             ['group_class', 'in', ['project','filter']],
1369                             *self._filters_for('groups', qualified=False),
1370                         ],
1371                         select=["uuid", "owner_uuid"],
1372                     ))
1373                     for ob in all_projects:
1374                         objects[ob['uuid']] = ob
1375
1376                     current_uuid = self.current_user['uuid']
1377                     for ob in all_projects:
1378                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1379                             roots.append(ob['uuid'])
1380                             root_owners.add(ob['owner_uuid'])
1381
1382                     lusers = arvados.util.keyset_list_all(
1383                         self.api.users().list,
1384                         order_key="uuid",
1385                         num_retries=self.num_retries,
1386                         filters=[
1387                             ['uuid', 'in', list(root_owners)],
1388                             *self._filters_for('users', qualified=False),
1389                         ],
1390                     )
1391                     lgroups = arvados.util.keyset_list_all(
1392                         self.api.groups().list,
1393                         order_key="uuid",
1394                         num_retries=self.num_retries,
1395                         filters=[
1396                             ['uuid', 'in', list(root_owners)+roots],
1397                             *self._filters_for('groups', qualified=False),
1398                         ],
1399                     )
1400                     for l in lusers:
1401                         objects[l["uuid"]] = l
1402                     for l in lgroups:
1403                         objects[l["uuid"]] = l
1404
1405                 for r in root_owners:
1406                     if r in objects:
1407                         obr = objects[r]
1408                         if obr.get("name"):
1409                             contents[obr["name"]] = obr
1410                         elif "first_name" in obr:
1411                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1412
1413                 for r in roots:
1414                     if r in objects:
1415                         obr = objects[r]
1416                         if obr['owner_uuid'] not in objects:
1417                             contents[obr["name"]] = obr
1418
1419             # end with llfuse.lock_released, re-acquire lock
1420
1421             self.merge(
1422                 contents.items(),
1423                 lambda i: i[0],
1424                 lambda a, i: a.uuid() == i[1]['uuid'],
1425                 lambda i: ProjectDirectory(
1426                     self.inode,
1427                     self.inodes,
1428                     self.api,
1429                     self.num_retries,
1430                     self._enable_write,
1431                     self._filters,
1432                     i[1],
1433                     poll=self._poll,
1434                     poll_time=self._poll_time,
1435                     storage_classes=self.storage_classes,
1436                 ),
1437             )
1438         except Exception:
1439             _logger.exception("arv-mount shared dir error")
1440         finally:
1441             self._updating_lock.release()
1442
1443     def want_event_subscribe(self):
1444         return True