Merge branch '19844-paging' refs #19844
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import apiclient
6 import arvados
7 import errno
8 import functools
9 import llfuse
10 import logging
11 import re
12 import sys
13 import threading
14 import time
15 from apiclient import errors as apiclient_errors
16
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
19
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
22
23 _logger = logging.getLogger('arvados.arvados_fuse')
24
25
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile('[\x00/]')
30
31
32 class Directory(FreshBase):
33     """Generic directory object, backed by a dict.
34
35     Consists of a set of entries with the key representing the filename
36     and the value referencing a File or Directory object.
37     """
38
39     def __init__(self, parent_inode, inodes, apiconfig, enable_write):
40         """parent_inode is the integer inode number"""
41
42         super(Directory, self).__init__()
43
44         self.inode = None
45         if not isinstance(parent_inode, int):
46             raise Exception("parent_inode should be an int")
47         self.parent_inode = parent_inode
48         self.inodes = inodes
49         self.apiconfig = apiconfig
50         self._entries = {}
51         self._mtime = time.time()
52         self._enable_write = enable_write
53
54     def forward_slash_subst(self):
55         if not hasattr(self, '_fsns'):
56             self._fsns = None
57             config = self.apiconfig()
58             try:
59                 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
60             except KeyError:
61                 # old API server with no FSNS config
62                 self._fsns = '_'
63             else:
64                 if self._fsns == '' or self._fsns == '/':
65                     self._fsns = None
66         return self._fsns
67
68     def unsanitize_filename(self, incoming):
69         """Replace ForwardSlashNameSubstitution value with /"""
70         fsns = self.forward_slash_subst()
71         if isinstance(fsns, str):
72             return incoming.replace(fsns, '/')
73         else:
74             return incoming
75
76     def sanitize_filename(self, dirty):
77         """Replace disallowed filename characters according to
78         ForwardSlashNameSubstitution in self.api_config."""
79         # '.' and '..' are not reachable if API server is newer than #6277
80         if dirty is None:
81             return None
82         elif dirty == '':
83             return '_'
84         elif dirty == '.':
85             return '_'
86         elif dirty == '..':
87             return '__'
88         else:
89             fsns = self.forward_slash_subst()
90             if isinstance(fsns, str):
91                 dirty = dirty.replace('/', fsns)
92             return _disallowed_filename_characters.sub('_', dirty)
93
94
95     #  Overridden by subclasses to implement logic to update the
96     #  entries dict when the directory is stale
97     @use_counter
98     def update(self):
99         pass
100
101     # Only used when computing the size of the disk footprint of the directory
102     # (stub)
103     def size(self):
104         return 0
105
106     def persisted(self):
107         return False
108
109     def checkupdate(self):
110         if self.stale():
111             try:
112                 self.update()
113             except apiclient.errors.HttpError as e:
114                 _logger.warn(e)
115
116     @use_counter
117     @check_update
118     def __getitem__(self, item):
119         return self._entries[item]
120
121     @use_counter
122     @check_update
123     def items(self):
124         return list(self._entries.items())
125
126     @use_counter
127     @check_update
128     def __contains__(self, k):
129         return k in self._entries
130
131     @use_counter
132     @check_update
133     def __len__(self):
134         return len(self._entries)
135
136     def fresh(self):
137         self.inodes.touch(self)
138         super(Directory, self).fresh()
139
140     def merge(self, items, fn, same, new_entry):
141         """Helper method for updating the contents of the directory.
142
143         Takes a list describing the new contents of the directory, reuse
144         entries that are the same in both the old and new lists, create new
145         entries, and delete old entries missing from the new list.
146
147         :items: iterable with new directory contents
148
149         :fn: function to take an entry in 'items' and return the desired file or
150         directory name, or None if this entry should be skipped
151
152         :same: function to compare an existing entry (a File or Directory
153         object) with an entry in the items list to determine whether to keep
154         the existing entry.
155
156         :new_entry: function to create a new directory entry (File or Directory
157         object) from an entry in the items list.
158
159         """
160
161         oldentries = self._entries
162         self._entries = {}
163         changed = False
164         for i in items:
165             name = self.sanitize_filename(fn(i))
166             if name:
167                 if name in oldentries and same(oldentries[name], i):
168                     # move existing directory entry over
169                     self._entries[name] = oldentries[name]
170                     del oldentries[name]
171                 else:
172                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
173                     # create new directory entry
174                     ent = new_entry(i)
175                     if ent is not None:
176                         self._entries[name] = self.inodes.add_entry(ent)
177                         changed = True
178
179         # delete any other directory entries that were not in found in 'items'
180         for i in oldentries:
181             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
182             self.inodes.invalidate_entry(self, i)
183             self.inodes.del_entry(oldentries[i])
184             changed = True
185
186         if changed:
187             self.inodes.invalidate_inode(self)
188             self._mtime = time.time()
189
190         self.fresh()
191
192     def in_use(self):
193         if super(Directory, self).in_use():
194             return True
195         for v in self._entries.values():
196             if v.in_use():
197                 return True
198         return False
199
200     def has_ref(self, only_children):
201         if super(Directory, self).has_ref(only_children):
202             return True
203         for v in self._entries.values():
204             if v.has_ref(False):
205                 return True
206         return False
207
208     def clear(self):
209         """Delete all entries"""
210         oldentries = self._entries
211         self._entries = {}
212         for n in oldentries:
213             oldentries[n].clear()
214             self.inodes.del_entry(oldentries[n])
215         self.invalidate()
216
217     def kernel_invalidate(self):
218         # Invalidating the dentry on the parent implies invalidating all paths
219         # below it as well.
220         parent = self.inodes[self.parent_inode]
221
222         # Find self on the parent in order to invalidate this path.
223         # Calling the public items() method might trigger a refresh,
224         # which we definitely don't want, so read the internal dict directly.
225         for k,v in parent._entries.items():
226             if v is self:
227                 self.inodes.invalidate_entry(parent, k)
228                 break
229
230     def mtime(self):
231         return self._mtime
232
233     def writable(self):
234         return False
235
236     def flush(self):
237         pass
238
239     def want_event_subscribe(self):
240         raise NotImplementedError()
241
242     def create(self, name):
243         raise NotImplementedError()
244
245     def mkdir(self, name):
246         raise NotImplementedError()
247
248     def unlink(self, name):
249         raise NotImplementedError()
250
251     def rmdir(self, name):
252         raise NotImplementedError()
253
254     def rename(self, name_old, name_new, src):
255         raise NotImplementedError()
256
257
258 class CollectionDirectoryBase(Directory):
259     """Represent an Arvados Collection as a directory.
260
261     This class is used for Subcollections, and is also the base class for
262     CollectionDirectory, which implements collection loading/saving on
263     Collection records.
264
265     Most operations act only the underlying Arvados `Collection` object.  The
266     `Collection` object signals via a notify callback to
267     `CollectionDirectoryBase.on_event` that an item was added, removed or
268     modified.  FUSE inodes and directory entries are created, deleted or
269     invalidated in response to these events.
270
271     """
272
273     def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection, collection_root):
274         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write)
275         self.apiconfig = apiconfig
276         self.collection = collection
277         self.collection_root = collection_root
278         self.collection_record_file = None
279
280     def new_entry(self, name, item, mtime):
281         name = self.sanitize_filename(name)
282         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
283             if item.fuse_entry.dead is not True:
284                 raise Exception("Can only reparent dead inode entry")
285             if item.fuse_entry.inode is None:
286                 raise Exception("Reparented entry must still have valid inode")
287             item.fuse_entry.dead = False
288             self._entries[name] = item.fuse_entry
289         elif isinstance(item, arvados.collection.RichCollectionBase):
290             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item, self.collection_root))
291             self._entries[name].populate(mtime)
292         else:
293             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
294         item.fuse_entry = self._entries[name]
295
296     def on_event(self, event, collection, name, item):
297         # These are events from the Collection object (ADD/DEL/MOD)
298         # emitted by operations on the Collection object (like
299         # "mkdirs" or "remove"), and by "update", which we need to
300         # synchronize with our FUSE objects that are assigned inodes.
301         if collection == self.collection:
302             name = self.sanitize_filename(name)
303
304             #
305             # It's possible for another thread to have llfuse.lock and
306             # be waiting on collection.lock.  Meanwhile, we released
307             # llfuse.lock earlier in the stack, but are still holding
308             # on to the collection lock, and now we need to re-acquire
309             # llfuse.lock.  If we don't release the collection lock,
310             # we'll deadlock where we're holding the collection lock
311             # waiting for llfuse.lock and the other thread is holding
312             # llfuse.lock and waiting for the collection lock.
313             #
314             # The correct locking order here is to take llfuse.lock
315             # first, then the collection lock.
316             #
317             # Since collection.lock is an RLock, it might be locked
318             # multiple times, so we need to release it multiple times,
319             # keep a count, then re-lock it the correct number of
320             # times.
321             #
322             lockcount = 0
323             try:
324                 while True:
325                     self.collection.lock.release()
326                     lockcount += 1
327             except RuntimeError:
328                 pass
329
330             try:
331                 with llfuse.lock:
332                     with self.collection.lock:
333                         if event == arvados.collection.ADD:
334                             self.new_entry(name, item, self.mtime())
335                         elif event == arvados.collection.DEL:
336                             ent = self._entries[name]
337                             del self._entries[name]
338                             self.inodes.invalidate_entry(self, name)
339                             self.inodes.del_entry(ent)
340                         elif event == arvados.collection.MOD:
341                             if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
342                                 self.inodes.invalidate_inode(item.fuse_entry)
343                             elif name in self._entries:
344                                 self.inodes.invalidate_inode(self._entries[name])
345
346                         if self.collection_record_file is not None:
347                             self.collection_record_file.invalidate()
348                             self.inodes.invalidate_inode(self.collection_record_file)
349             finally:
350                 while lockcount > 0:
351                     self.collection.lock.acquire()
352                     lockcount -= 1
353
354     def populate(self, mtime):
355         self._mtime = mtime
356         with self.collection.lock:
357             self.collection.subscribe(self.on_event)
358             for entry, item in self.collection.items():
359                 self.new_entry(entry, item, self.mtime())
360
361     def writable(self):
362         return self._enable_write and self.collection.writable()
363
364     @use_counter
365     def flush(self):
366         self.collection_root.flush()
367
368     @use_counter
369     @check_update
370     def create(self, name):
371         if not self.writable():
372             raise llfuse.FUSEError(errno.EROFS)
373         with llfuse.lock_released:
374             self.collection.open(name, "w").close()
375
376     @use_counter
377     @check_update
378     def mkdir(self, name):
379         if not self.writable():
380             raise llfuse.FUSEError(errno.EROFS)
381         with llfuse.lock_released:
382             self.collection.mkdirs(name)
383
384     @use_counter
385     @check_update
386     def unlink(self, name):
387         if not self.writable():
388             raise llfuse.FUSEError(errno.EROFS)
389         with llfuse.lock_released:
390             self.collection.remove(name)
391         self.flush()
392
393     @use_counter
394     @check_update
395     def rmdir(self, name):
396         if not self.writable():
397             raise llfuse.FUSEError(errno.EROFS)
398         with llfuse.lock_released:
399             self.collection.remove(name)
400         self.flush()
401
402     @use_counter
403     @check_update
404     def rename(self, name_old, name_new, src):
405         if not self.writable():
406             raise llfuse.FUSEError(errno.EROFS)
407
408         if not isinstance(src, CollectionDirectoryBase):
409             raise llfuse.FUSEError(errno.EPERM)
410
411         if name_new in self:
412             ent = src[name_old]
413             tgt = self[name_new]
414             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
415                 pass
416             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
417                 if len(tgt) > 0:
418                     raise llfuse.FUSEError(errno.ENOTEMPTY)
419             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
420                 raise llfuse.FUSEError(errno.ENOTDIR)
421             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
422                 raise llfuse.FUSEError(errno.EISDIR)
423
424         with llfuse.lock_released:
425             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
426         self.flush()
427         src.flush()
428
429     def clear(self):
430         super(CollectionDirectoryBase, self).clear()
431         self.collection = None
432
433
434 class CollectionDirectory(CollectionDirectoryBase):
435     """Represents the root of a directory tree representing a collection."""
436
437     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, collection_record=None, explicit_collection=None):
438         super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None, self)
439         self.api = api
440         self.num_retries = num_retries
441         self._poll = True
442         try:
443             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
444         except:
445             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
446             self._poll_time = 60*60
447
448         if isinstance(collection_record, dict):
449             self.collection_locator = collection_record['uuid']
450             self._mtime = convertTime(collection_record.get('modified_at'))
451         else:
452             self.collection_locator = collection_record
453             self._mtime = 0
454         self._manifest_size = 0
455         if self.collection_locator:
456             self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
457         self._updating_lock = threading.Lock()
458
459     def same(self, i):
460         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
461
462     def writable(self):
463         return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
464
465     @use_counter
466     def flush(self):
467         if not self.writable():
468             return
469         with llfuse.lock_released:
470             with self._updating_lock:
471                 if self.collection.committed():
472                     self.collection.update()
473                 else:
474                     self.collection.save()
475                 self.new_collection_record(self.collection.api_response())
476
477     def want_event_subscribe(self):
478         return (uuid_pattern.match(self.collection_locator) is not None)
479
480     def new_collection(self, new_collection_record, coll_reader):
481         if self.inode:
482             self.clear()
483         self.collection = coll_reader
484         self.new_collection_record(new_collection_record)
485         self.populate(self.mtime())
486
487     def new_collection_record(self, new_collection_record):
488         if not new_collection_record:
489             raise Exception("invalid new_collection_record")
490         self._mtime = convertTime(new_collection_record.get('modified_at'))
491         self._manifest_size = len(new_collection_record["manifest_text"])
492         self.collection_locator = new_collection_record["uuid"]
493         if self.collection_record_file is not None:
494             self.collection_record_file.invalidate()
495             self.inodes.invalidate_inode(self.collection_record_file)
496             _logger.debug("%s invalidated collection record file", self)
497         self.fresh()
498
499     def uuid(self):
500         return self.collection_locator
501
502     @use_counter
503     def update(self):
504         try:
505             if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator):
506                 # It's immutable, nothing to update
507                 return True
508
509             if self.collection_locator is None:
510                 # No collection locator to retrieve from
511                 self.fresh()
512                 return True
513
514             new_collection_record = None
515             try:
516                 with llfuse.lock_released:
517                     self._updating_lock.acquire()
518                     if not self.stale():
519                         return True
520
521                     _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
522                     coll_reader = None
523                     if self.collection is not None:
524                         # Already have a collection object
525                         self.collection.update()
526                         new_collection_record = self.collection.api_response()
527                     else:
528                         # If there's too many prefetch threads and you
529                         # max out the CPU, delivering data to the FUSE
530                         # layer actually ends up being slower.
531                         # Experimentally, capping 7 threads seems to
532                         # be a sweet spot.
533                         get_threads = min(max((self.api.keep.block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
534                         # Create a new collection object
535                         if uuid_pattern.match(self.collection_locator):
536                             coll_reader = arvados.collection.Collection(
537                                 self.collection_locator, self.api, self.api.keep,
538                                 num_retries=self.num_retries,
539                                 get_threads=get_threads)
540                         else:
541                             coll_reader = arvados.collection.CollectionReader(
542                                 self.collection_locator, self.api, self.api.keep,
543                                 num_retries=self.num_retries,
544                                 get_threads=get_threads)
545                         new_collection_record = coll_reader.api_response() or {}
546                         # If the Collection only exists in Keep, there will be no API
547                         # response.  Fill in the fields we need.
548                         if 'uuid' not in new_collection_record:
549                             new_collection_record['uuid'] = self.collection_locator
550                         if "portable_data_hash" not in new_collection_record:
551                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
552                         if 'manifest_text' not in new_collection_record:
553                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
554                         if 'storage_classes_desired' not in new_collection_record:
555                             new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
556
557                 # end with llfuse.lock_released, re-acquire lock
558
559                 if new_collection_record is not None:
560                     if coll_reader is not None:
561                         self.new_collection(new_collection_record, coll_reader)
562                     else:
563                         self.new_collection_record(new_collection_record)
564
565                 return True
566             finally:
567                 self._updating_lock.release()
568         except arvados.errors.NotFoundError as e:
569             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
570         except arvados.errors.ArgumentError as detail:
571             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
572             if new_collection_record is not None and "manifest_text" in new_collection_record:
573                 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
574         except Exception:
575             _logger.exception("arv-mount %s: error", self.collection_locator)
576             if new_collection_record is not None and "manifest_text" in new_collection_record:
577                 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
578         self.invalidate()
579         return False
580
581     @use_counter
582     def collection_record(self):
583         self.flush()
584         return self.collection.api_response()
585
586     @use_counter
587     @check_update
588     def __getitem__(self, item):
589         if item == '.arvados#collection':
590             if self.collection_record_file is None:
591                 self.collection_record_file = FuncToJSONFile(
592                     self.inode, self.collection_record)
593                 self.inodes.add_entry(self.collection_record_file)
594             self.invalidate()  # use lookup as a signal to force update
595             return self.collection_record_file
596         else:
597             return super(CollectionDirectory, self).__getitem__(item)
598
599     def __contains__(self, k):
600         if k == '.arvados#collection':
601             return True
602         else:
603             return super(CollectionDirectory, self).__contains__(k)
604
605     def invalidate(self):
606         if self.collection_record_file is not None:
607             self.collection_record_file.invalidate()
608             self.inodes.invalidate_inode(self.collection_record_file)
609         super(CollectionDirectory, self).invalidate()
610
611     def persisted(self):
612         return (self.collection_locator is not None)
613
614     def objsize(self):
615         # This is an empirically-derived heuristic to estimate the memory used
616         # to store this collection's metadata.  Calculating the memory
617         # footprint directly would be more accurate, but also more complicated.
618         return self._manifest_size * 128
619
620     def finalize(self):
621         if self.collection is not None:
622             if self.writable():
623                 self.collection.save()
624             self.collection.stop_threads()
625
626     def clear(self):
627         if self.collection is not None:
628             self.collection.stop_threads()
629         super(CollectionDirectory, self).clear()
630         self._manifest_size = 0
631
632
633 class TmpCollectionDirectory(CollectionDirectoryBase):
634     """A directory backed by an Arvados collection that never gets saved.
635
636     This supports using Keep as scratch space. A userspace program can
637     read the .arvados#collection file to get a current manifest in
638     order to save a snapshot of the scratch data or use it as a crunch
639     job output.
640     """
641
642     class UnsaveableCollection(arvados.collection.Collection):
643         def save(self):
644             pass
645         def save_new(self):
646             pass
647
648     def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, storage_classes=None):
649         collection = self.UnsaveableCollection(
650             api_client=api_client,
651             keep_client=api_client.keep,
652             num_retries=num_retries,
653             storage_classes_desired=storage_classes)
654         # This is always enable_write=True because it never tries to
655         # save to the backend
656         super(TmpCollectionDirectory, self).__init__(
657             parent_inode, inodes, api_client.config, True, collection, self)
658         self.populate(self.mtime())
659
660     def on_event(self, *args, **kwargs):
661         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
662         if self.collection_record_file is None:
663             return
664
665         # See discussion in CollectionDirectoryBase.on_event
666         lockcount = 0
667         try:
668             while True:
669                 self.collection.lock.release()
670                 lockcount += 1
671         except RuntimeError:
672             pass
673
674         try:
675             with llfuse.lock:
676                 with self.collection.lock:
677                     self.collection_record_file.invalidate()
678                     self.inodes.invalidate_inode(self.collection_record_file)
679                     _logger.debug("%s invalidated collection record", self)
680         finally:
681             while lockcount > 0:
682                 self.collection.lock.acquire()
683                 lockcount -= 1
684
685     def collection_record(self):
686         with llfuse.lock_released:
687             return {
688                 "uuid": None,
689                 "manifest_text": self.collection.manifest_text(),
690                 "portable_data_hash": self.collection.portable_data_hash(),
691                 "storage_classes_desired": self.collection.storage_classes_desired(),
692             }
693
694     def __contains__(self, k):
695         return (k == '.arvados#collection' or
696                 super(TmpCollectionDirectory, self).__contains__(k))
697
698     @use_counter
699     def __getitem__(self, item):
700         if item == '.arvados#collection':
701             if self.collection_record_file is None:
702                 self.collection_record_file = FuncToJSONFile(
703                     self.inode, self.collection_record)
704                 self.inodes.add_entry(self.collection_record_file)
705             return self.collection_record_file
706         return super(TmpCollectionDirectory, self).__getitem__(item)
707
708     def persisted(self):
709         return False
710
711     def writable(self):
712         return True
713
714     def flush(self):
715         pass
716
717     def want_event_subscribe(self):
718         return False
719
720     def finalize(self):
721         self.collection.stop_threads()
722
723     def invalidate(self):
724         if self.collection_record_file:
725             self.collection_record_file.invalidate()
726         super(TmpCollectionDirectory, self).invalidate()
727
728
729 class MagicDirectory(Directory):
730     """A special directory that logically contains the set of all extant keep locators.
731
732     When a file is referenced by lookup(), it is tested to see if it is a valid
733     keep locator to a manifest, and if so, loads the manifest contents as a
734     subdirectory of this directory with the locator as the directory name.
735     Since querying a list of all extant keep locators is impractical, only
736     collections that have already been accessed are visible to readdir().
737
738     """
739
740     README_TEXT = """
741 This directory provides access to Arvados collections as subdirectories listed
742 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
743 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
744 (in the form 'zzzzz-j7d0g-1234567890abcde').
745
746 Note that this directory will appear empty until you attempt to access a
747 specific collection or project subdirectory (such as trying to 'cd' into it),
748 at which point the collection or project will actually be looked up on the server
749 and the directory will appear if it exists.
750
751 """.lstrip()
752
753     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, pdh_only=False, storage_classes=None):
754         super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
755         self.api = api
756         self.num_retries = num_retries
757         self.pdh_only = pdh_only
758         self.storage_classes = storage_classes
759
760     def __setattr__(self, name, value):
761         super(MagicDirectory, self).__setattr__(name, value)
762         # When we're assigned an inode, add a README.
763         if ((name == 'inode') and (self.inode is not None) and
764               (not self._entries)):
765             self._entries['README'] = self.inodes.add_entry(
766                 StringFile(self.inode, self.README_TEXT, time.time()))
767             # If we're the root directory, add an identical by_id subdirectory.
768             if self.inode == llfuse.ROOT_INODE:
769                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
770                     self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
771                     self.pdh_only))
772
773     def __contains__(self, k):
774         if k in self._entries:
775             return True
776
777         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
778             return False
779
780         try:
781             e = None
782
783             if group_uuid_pattern.match(k):
784                 project = self.api.groups().list(
785                     filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
786                 if project[u'items_available'] == 0:
787                     return False
788                 e = self.inodes.add_entry(ProjectDirectory(
789                     self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
790                     project[u'items'][0], storage_classes=self.storage_classes))
791             else:
792                 e = self.inodes.add_entry(CollectionDirectory(
793                         self.inode, self.inodes, self.api, self.num_retries, self._enable_write, k))
794
795             if e.update():
796                 if k not in self._entries:
797                     self._entries[k] = e
798                 else:
799                     self.inodes.del_entry(e)
800                 return True
801             else:
802                 self.inodes.invalidate_entry(self, k)
803                 self.inodes.del_entry(e)
804                 return False
805         except Exception as ex:
806             _logger.exception("arv-mount lookup '%s':", k)
807             if e is not None:
808                 self.inodes.del_entry(e)
809             return False
810
811     def __getitem__(self, item):
812         if item in self:
813             return self._entries[item]
814         else:
815             raise KeyError("No collection with id " + item)
816
817     def clear(self):
818         pass
819
820     def want_event_subscribe(self):
821         return not self.pdh_only
822
823
824 class TagsDirectory(Directory):
825     """A special directory that contains as subdirectories all tags visible to the user."""
826
827     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, poll_time=60):
828         super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
829         self.api = api
830         self.num_retries = num_retries
831         self._poll = True
832         self._poll_time = poll_time
833         self._extra = set()
834
835     def want_event_subscribe(self):
836         return True
837
838     @use_counter
839     def update(self):
840         with llfuse.lock_released:
841             tags = self.api.links().list(
842                 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
843                 select=['name'], distinct=True, limit=1000
844                 ).execute(num_retries=self.num_retries)
845         if "items" in tags:
846             self.merge(tags['items']+[{"name": n} for n in self._extra],
847                        lambda i: i['name'],
848                        lambda a, i: a.tag == i['name'],
849                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
850                                               i['name'], poll=self._poll, poll_time=self._poll_time))
851
852     @use_counter
853     @check_update
854     def __getitem__(self, item):
855         if super(TagsDirectory, self).__contains__(item):
856             return super(TagsDirectory, self).__getitem__(item)
857         with llfuse.lock_released:
858             tags = self.api.links().list(
859                 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
860             ).execute(num_retries=self.num_retries)
861         if tags["items"]:
862             self._extra.add(item)
863             self.update()
864         return super(TagsDirectory, self).__getitem__(item)
865
866     @use_counter
867     @check_update
868     def __contains__(self, k):
869         if super(TagsDirectory, self).__contains__(k):
870             return True
871         try:
872             self[k]
873             return True
874         except KeyError:
875             pass
876         return False
877
878
879 class TagDirectory(Directory):
880     """A special directory that contains as subdirectories all collections visible
881     to the user that are tagged with a particular tag.
882     """
883
884     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, tag,
885                  poll=False, poll_time=60):
886         super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
887         self.api = api
888         self.num_retries = num_retries
889         self.tag = tag
890         self._poll = poll
891         self._poll_time = poll_time
892
893     def want_event_subscribe(self):
894         return True
895
896     @use_counter
897     def update(self):
898         with llfuse.lock_released:
899             taggedcollections = self.api.links().list(
900                 filters=[['link_class', '=', 'tag'],
901                          ['name', '=', self.tag],
902                          ['head_uuid', 'is_a', 'arvados#collection']],
903                 select=['head_uuid']
904                 ).execute(num_retries=self.num_retries)
905         self.merge(taggedcollections['items'],
906                    lambda i: i['head_uuid'],
907                    lambda a, i: a.collection_locator == i['head_uuid'],
908                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid']))
909
910
911 class ProjectDirectory(Directory):
912     """A special directory that contains the contents of a project."""
913
914     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, project_object,
915                  poll=True, poll_time=3, storage_classes=None):
916         super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
917         self.api = api
918         self.num_retries = num_retries
919         self.project_object = project_object
920         self.project_object_file = None
921         self.project_uuid = project_object['uuid']
922         self._poll = poll
923         self._poll_time = poll_time
924         self._updating_lock = threading.Lock()
925         self._current_user = None
926         self._full_listing = False
927         self.storage_classes = storage_classes
928
929     def want_event_subscribe(self):
930         return True
931
932     def createDirectory(self, i):
933         if collection_uuid_pattern.match(i['uuid']):
934             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i)
935         elif group_uuid_pattern.match(i['uuid']):
936             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
937                                     i, self._poll, self._poll_time, self.storage_classes)
938         elif link_uuid_pattern.match(i['uuid']):
939             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
940                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid'])
941             else:
942                 return None
943         elif uuid_pattern.match(i['uuid']):
944             return ObjectFile(self.parent_inode, i)
945         else:
946             return None
947
948     def uuid(self):
949         return self.project_uuid
950
951     def items(self):
952         self._full_listing = True
953         return super(ProjectDirectory, self).items()
954
955     def namefn(self, i):
956         if 'name' in i:
957             if i['name'] is None or len(i['name']) == 0:
958                 return None
959             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
960                 # collection or subproject
961                 return i['name']
962             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
963                 # name link
964                 return i['name']
965             elif 'kind' in i and i['kind'].startswith('arvados#'):
966                 # something else
967                 return "{}.{}".format(i['name'], i['kind'][8:])
968         else:
969             return None
970
971
972     @use_counter
973     def update(self):
974         if self.project_object_file == None:
975             self.project_object_file = ObjectFile(self.inode, self.project_object)
976             self.inodes.add_entry(self.project_object_file)
977
978         if not self._full_listing:
979             return True
980
981         def samefn(a, i):
982             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
983                 return a.uuid() == i['uuid']
984             elif isinstance(a, ObjectFile):
985                 return a.uuid() == i['uuid'] and not a.stale()
986             return False
987
988         try:
989             with llfuse.lock_released:
990                 self._updating_lock.acquire()
991                 if not self.stale():
992                     return
993
994                 if group_uuid_pattern.match(self.project_uuid):
995                     self.project_object = self.api.groups().get(
996                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
997                 elif user_uuid_pattern.match(self.project_uuid):
998                     self.project_object = self.api.users().get(
999                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
1000                 # do this in 2 steps until #17424 is fixed
1001                 contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
1002                                                         order_key="uuid",
1003                                                         num_retries=self.num_retries,
1004                                                         uuid=self.project_uuid,
1005                                                         filters=[["uuid", "is_a", "arvados#group"],
1006                                                                  ["groups.group_class", "in", ["project","filter"]]]))
1007                 contents.extend(filter(lambda i: i["current_version_uuid"] == i["uuid"],
1008                                        arvados.util.keyset_list_all(self.api.groups().contents,
1009                                                              order_key="uuid",
1010                                                              num_retries=self.num_retries,
1011                                                              uuid=self.project_uuid,
1012                                                              filters=[["uuid", "is_a", "arvados#collection"]])))
1013
1014
1015             # end with llfuse.lock_released, re-acquire lock
1016
1017             self.merge(contents,
1018                        self.namefn,
1019                        samefn,
1020                        self.createDirectory)
1021             return True
1022         finally:
1023             self._updating_lock.release()
1024
1025     def _add_entry(self, i, name):
1026         ent = self.createDirectory(i)
1027         self._entries[name] = self.inodes.add_entry(ent)
1028         return self._entries[name]
1029
1030     @use_counter
1031     @check_update
1032     def __getitem__(self, k):
1033         if k == '.arvados#project':
1034             return self.project_object_file
1035         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1036             return super(ProjectDirectory, self).__getitem__(k)
1037         with llfuse.lock_released:
1038             k2 = self.unsanitize_filename(k)
1039             if k2 == k:
1040                 namefilter = ["name", "=", k]
1041             else:
1042                 namefilter = ["name", "in", [k, k2]]
1043             contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
1044                                                        ["group_class", "in", ["project","filter"]],
1045                                                        namefilter],
1046                                               limit=2).execute(num_retries=self.num_retries)["items"]
1047             if not contents:
1048                 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
1049                                                                 namefilter],
1050                                                        limit=2).execute(num_retries=self.num_retries)["items"]
1051         if contents:
1052             if len(contents) > 1 and contents[1]['name'] == k:
1053                 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1054                 # "foo[SUBST]bar".
1055                 contents = [contents[1]]
1056             name = self.sanitize_filename(self.namefn(contents[0]))
1057             if name != k:
1058                 raise KeyError(k)
1059             return self._add_entry(contents[0], name)
1060
1061         # Didn't find item
1062         raise KeyError(k)
1063
1064     def __contains__(self, k):
1065         if k == '.arvados#project':
1066             return True
1067         try:
1068             self[k]
1069             return True
1070         except KeyError:
1071             pass
1072         return False
1073
1074     @use_counter
1075     @check_update
1076     def writable(self):
1077         if not self._enable_write:
1078             return False
1079         with llfuse.lock_released:
1080             if not self._current_user:
1081                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1082             return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1083
1084     def persisted(self):
1085         return True
1086
1087     @use_counter
1088     @check_update
1089     def mkdir(self, name):
1090         if not self.writable():
1091             raise llfuse.FUSEError(errno.EROFS)
1092
1093         try:
1094             with llfuse.lock_released:
1095                 c = {
1096                     "owner_uuid": self.project_uuid,
1097                     "name": name,
1098                     "manifest_text": "" }
1099                 if self.storage_classes is not None:
1100                     c["storage_classes_desired"] = self.storage_classes
1101                 try:
1102                     self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1103                 except Exception as e:
1104                     raise
1105             self.invalidate()
1106         except apiclient_errors.Error as error:
1107             _logger.error(error)
1108             raise llfuse.FUSEError(errno.EEXIST)
1109
1110     @use_counter
1111     @check_update
1112     def rmdir(self, name):
1113         if not self.writable():
1114             raise llfuse.FUSEError(errno.EROFS)
1115
1116         if name not in self:
1117             raise llfuse.FUSEError(errno.ENOENT)
1118         if not isinstance(self[name], CollectionDirectory):
1119             raise llfuse.FUSEError(errno.EPERM)
1120         if len(self[name]) > 0:
1121             raise llfuse.FUSEError(errno.ENOTEMPTY)
1122         with llfuse.lock_released:
1123             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1124         self.invalidate()
1125
1126     @use_counter
1127     @check_update
1128     def rename(self, name_old, name_new, src):
1129         if not self.writable():
1130             raise llfuse.FUSEError(errno.EROFS)
1131
1132         if not isinstance(src, ProjectDirectory):
1133             raise llfuse.FUSEError(errno.EPERM)
1134
1135         ent = src[name_old]
1136
1137         if not isinstance(ent, CollectionDirectory):
1138             raise llfuse.FUSEError(errno.EPERM)
1139
1140         if name_new in self:
1141             # POSIX semantics for replacing one directory with another is
1142             # tricky (the target directory must be empty, the operation must be
1143             # atomic which isn't possible with the Arvados API as of this
1144             # writing) so don't support that.
1145             raise llfuse.FUSEError(errno.EPERM)
1146
1147         self.api.collections().update(uuid=ent.uuid(),
1148                                       body={"owner_uuid": self.uuid(),
1149                                             "name": name_new}).execute(num_retries=self.num_retries)
1150
1151         # Acually move the entry from source directory to this directory.
1152         del src._entries[name_old]
1153         self._entries[name_new] = ent
1154         self.inodes.invalidate_entry(src, name_old)
1155
1156     @use_counter
1157     def child_event(self, ev):
1158         properties = ev.get("properties") or {}
1159         old_attrs = properties.get("old_attributes") or {}
1160         new_attrs = properties.get("new_attributes") or {}
1161         old_attrs["uuid"] = ev["object_uuid"]
1162         new_attrs["uuid"] = ev["object_uuid"]
1163         old_name = self.sanitize_filename(self.namefn(old_attrs))
1164         new_name = self.sanitize_filename(self.namefn(new_attrs))
1165
1166         # create events will have a new name, but not an old name
1167         # delete events will have an old name, but not a new name
1168         # update events will have an old and new name, and they may be same or different
1169         # if they are the same, an unrelated field changed and there is nothing to do.
1170
1171         if old_attrs.get("owner_uuid") != self.project_uuid:
1172             # Was moved from somewhere else, so don't try to remove entry.
1173             old_name = None
1174         if ev.get("object_owner_uuid") != self.project_uuid:
1175             # Was moved to somewhere else, so don't try to add entry
1176             new_name = None
1177
1178         if old_attrs.get("is_trashed"):
1179             # Was previously deleted
1180             old_name = None
1181         if new_attrs.get("is_trashed"):
1182             # Has been deleted
1183             new_name = None
1184
1185         if new_name != old_name:
1186             ent = None
1187             if old_name in self._entries:
1188                 ent = self._entries[old_name]
1189                 del self._entries[old_name]
1190                 self.inodes.invalidate_entry(self, old_name)
1191
1192             if new_name:
1193                 if ent is not None:
1194                     self._entries[new_name] = ent
1195                 else:
1196                     self._add_entry(new_attrs, new_name)
1197             elif ent is not None:
1198                 self.inodes.del_entry(ent)
1199
1200
1201 class SharedDirectory(Directory):
1202     """A special directory that represents users or groups who have shared projects with me."""
1203
1204     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, exclude,
1205                  poll=False, poll_time=60, storage_classes=None):
1206         super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
1207         self.api = api
1208         self.num_retries = num_retries
1209         self.current_user = api.users().current().execute(num_retries=num_retries)
1210         self._poll = True
1211         self._poll_time = poll_time
1212         self._updating_lock = threading.Lock()
1213         self.storage_classes = storage_classes
1214
1215     @use_counter
1216     def update(self):
1217         try:
1218             with llfuse.lock_released:
1219                 self._updating_lock.acquire()
1220                 if not self.stale():
1221                     return
1222
1223                 contents = {}
1224                 roots = []
1225                 root_owners = set()
1226                 objects = {}
1227
1228                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1229                 if 'httpMethod' in methods.get('shared', {}):
1230                     page = []
1231                     while True:
1232                         resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
1233                                                         order="uuid",
1234                                                         limit=10000,
1235                                                         count="none",
1236                                                         include="owner_uuid").execute()
1237                         if not resp["items"]:
1238                             break
1239                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1240                         for r in resp["items"]:
1241                             objects[r["uuid"]] = r
1242                             roots.append(r["uuid"])
1243                         for r in resp["included"]:
1244                             objects[r["uuid"]] = r
1245                             root_owners.add(r["uuid"])
1246                 else:
1247                     all_projects = list(arvados.util.keyset_list_all(
1248                         self.api.groups().list,
1249                         order_key="uuid",
1250                         num_retries=self.num_retries,
1251                         filters=[['group_class','in',['project','filter']]],
1252                         select=["uuid", "owner_uuid"]))
1253                     for ob in all_projects:
1254                         objects[ob['uuid']] = ob
1255
1256                     current_uuid = self.current_user['uuid']
1257                     for ob in all_projects:
1258                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1259                             roots.append(ob['uuid'])
1260                             root_owners.add(ob['owner_uuid'])
1261
1262                     lusers = arvados.util.keyset_list_all(
1263                         self.api.users().list,
1264                         order_key="uuid",
1265                         num_retries=self.num_retries,
1266                         filters=[['uuid','in', list(root_owners)]])
1267                     lgroups = arvados.util.keyset_list_all(
1268                         self.api.groups().list,
1269                         order_key="uuid",
1270                         num_retries=self.num_retries,
1271                         filters=[['uuid','in', list(root_owners)+roots]])
1272
1273                     for l in lusers:
1274                         objects[l["uuid"]] = l
1275                     for l in lgroups:
1276                         objects[l["uuid"]] = l
1277
1278                 for r in root_owners:
1279                     if r in objects:
1280                         obr = objects[r]
1281                         if obr.get("name"):
1282                             contents[obr["name"]] = obr
1283                         elif "first_name" in obr:
1284                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1285
1286                 for r in roots:
1287                     if r in objects:
1288                         obr = objects[r]
1289                         if obr['owner_uuid'] not in objects:
1290                             contents[obr["name"]] = obr
1291
1292             # end with llfuse.lock_released, re-acquire lock
1293
1294             self.merge(contents.items(),
1295                        lambda i: i[0],
1296                        lambda a, i: a.uuid() == i[1]['uuid'],
1297                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
1298                                                   i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
1299         except Exception:
1300             _logger.exception("arv-mount shared dir error")
1301         finally:
1302             self._updating_lock.release()
1303
1304     def want_event_subscribe(self):
1305         return True