876825a370dc466a216ccb2053ae4f159e47fdf8
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import apiclient
6 import arvados
7 import errno
8 import functools
9 import llfuse
10 import logging
11 import re
12 import sys
13 import threading
14 import time
15 from apiclient import errors as apiclient_errors
16
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
19
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
22
23 _logger = logging.getLogger('arvados.arvados_fuse')
24
25
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile('[\x00/]')
30
31
32 class Directory(FreshBase):
33     """Generic directory object, backed by a dict.
34
35     Consists of a set of entries with the key representing the filename
36     and the value referencing a File or Directory object.
37     """
38
39     def __init__(self, parent_inode, inodes, apiconfig, enable_write):
40         """parent_inode is the integer inode number"""
41
42         super(Directory, self).__init__()
43
44         self.inode = None
45         if not isinstance(parent_inode, int):
46             raise Exception("parent_inode should be an int")
47         self.parent_inode = parent_inode
48         self.inodes = inodes
49         self.apiconfig = apiconfig
50         self._entries = {}
51         self._mtime = time.time()
52         self._enable_write = enable_write
53
54     def forward_slash_subst(self):
55         if not hasattr(self, '_fsns'):
56             self._fsns = None
57             config = self.apiconfig()
58             try:
59                 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
60             except KeyError:
61                 # old API server with no FSNS config
62                 self._fsns = '_'
63             else:
64                 if self._fsns == '' or self._fsns == '/':
65                     self._fsns = None
66         return self._fsns
67
68     def unsanitize_filename(self, incoming):
69         """Replace ForwardSlashNameSubstitution value with /"""
70         fsns = self.forward_slash_subst()
71         if isinstance(fsns, str):
72             return incoming.replace(fsns, '/')
73         else:
74             return incoming
75
76     def sanitize_filename(self, dirty):
77         """Replace disallowed filename characters according to
78         ForwardSlashNameSubstitution in self.api_config."""
79         # '.' and '..' are not reachable if API server is newer than #6277
80         if dirty is None:
81             return None
82         elif dirty == '':
83             return '_'
84         elif dirty == '.':
85             return '_'
86         elif dirty == '..':
87             return '__'
88         else:
89             fsns = self.forward_slash_subst()
90             if isinstance(fsns, str):
91                 dirty = dirty.replace('/', fsns)
92             return _disallowed_filename_characters.sub('_', dirty)
93
94
95     #  Overridden by subclasses to implement logic to update the
96     #  entries dict when the directory is stale
97     @use_counter
98     def update(self):
99         pass
100
101     # Only used when computing the size of the disk footprint of the directory
102     # (stub)
103     def size(self):
104         return 0
105
106     def persisted(self):
107         return False
108
109     def checkupdate(self):
110         if self.stale():
111             try:
112                 self.update()
113             except apiclient.errors.HttpError as e:
114                 _logger.warn(e)
115
116     @use_counter
117     @check_update
118     def __getitem__(self, item):
119         return self._entries[item]
120
121     @use_counter
122     @check_update
123     def items(self):
124         return list(self._entries.items())
125
126     @use_counter
127     @check_update
128     def __contains__(self, k):
129         return k in self._entries
130
131     @use_counter
132     @check_update
133     def __len__(self):
134         return len(self._entries)
135
136     def fresh(self):
137         self.inodes.touch(self)
138         super(Directory, self).fresh()
139
140     def merge(self, items, fn, same, new_entry):
141         """Helper method for updating the contents of the directory.
142
143         Takes a list describing the new contents of the directory, reuse
144         entries that are the same in both the old and new lists, create new
145         entries, and delete old entries missing from the new list.
146
147         :items: iterable with new directory contents
148
149         :fn: function to take an entry in 'items' and return the desired file or
150         directory name, or None if this entry should be skipped
151
152         :same: function to compare an existing entry (a File or Directory
153         object) with an entry in the items list to determine whether to keep
154         the existing entry.
155
156         :new_entry: function to create a new directory entry (File or Directory
157         object) from an entry in the items list.
158
159         """
160
161         oldentries = self._entries
162         self._entries = {}
163         changed = False
164         for i in items:
165             name = self.sanitize_filename(fn(i))
166             if name:
167                 if name in oldentries and same(oldentries[name], i):
168                     # move existing directory entry over
169                     self._entries[name] = oldentries[name]
170                     del oldentries[name]
171                 else:
172                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
173                     # create new directory entry
174                     ent = new_entry(i)
175                     if ent is not None:
176                         self._entries[name] = self.inodes.add_entry(ent)
177                         changed = True
178
179         # delete any other directory entries that were not in found in 'items'
180         for i in oldentries:
181             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
182             self.inodes.invalidate_entry(self, i)
183             self.inodes.del_entry(oldentries[i])
184             changed = True
185
186         if changed:
187             self.inodes.invalidate_inode(self)
188             self._mtime = time.time()
189
190         self.fresh()
191
192     def in_use(self):
193         if super(Directory, self).in_use():
194             return True
195         for v in self._entries.values():
196             if v.in_use():
197                 return True
198         return False
199
200     def has_ref(self, only_children):
201         if super(Directory, self).has_ref(only_children):
202             return True
203         for v in self._entries.values():
204             if v.has_ref(False):
205                 return True
206         return False
207
208     def clear(self):
209         """Delete all entries"""
210         oldentries = self._entries
211         self._entries = {}
212         for n in oldentries:
213             oldentries[n].clear()
214             self.inodes.del_entry(oldentries[n])
215         self.invalidate()
216
217     def kernel_invalidate(self):
218         # Invalidating the dentry on the parent implies invalidating all paths
219         # below it as well.
220         parent = self.inodes[self.parent_inode]
221
222         # Find self on the parent in order to invalidate this path.
223         # Calling the public items() method might trigger a refresh,
224         # which we definitely don't want, so read the internal dict directly.
225         for k,v in parent._entries.items():
226             if v is self:
227                 self.inodes.invalidate_entry(parent, k)
228                 break
229
230     def mtime(self):
231         return self._mtime
232
233     def writable(self):
234         return False
235
236     def flush(self):
237         pass
238
239     def want_event_subscribe(self):
240         raise NotImplementedError()
241
242     def create(self, name):
243         raise NotImplementedError()
244
245     def mkdir(self, name):
246         raise NotImplementedError()
247
248     def unlink(self, name):
249         raise NotImplementedError()
250
251     def rmdir(self, name):
252         raise NotImplementedError()
253
254     def rename(self, name_old, name_new, src):
255         raise NotImplementedError()
256
257
258 class CollectionDirectoryBase(Directory):
259     """Represent an Arvados Collection as a directory.
260
261     This class is used for Subcollections, and is also the base class for
262     CollectionDirectory, which implements collection loading/saving on
263     Collection records.
264
265     Most operations act only the underlying Arvados `Collection` object.  The
266     `Collection` object signals via a notify callback to
267     `CollectionDirectoryBase.on_event` that an item was added, removed or
268     modified.  FUSE inodes and directory entries are created, deleted or
269     invalidated in response to these events.
270
271     """
272
273     def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection, collection_root):
274         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write)
275         self.apiconfig = apiconfig
276         self.collection = collection
277         self.collection_root = collection_root
278         self.collection_record_file = None
279
280     def new_entry(self, name, item, mtime):
281         name = self.sanitize_filename(name)
282         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
283             if item.fuse_entry.dead is not True:
284                 raise Exception("Can only reparent dead inode entry")
285             if item.fuse_entry.inode is None:
286                 raise Exception("Reparented entry must still have valid inode")
287             item.fuse_entry.dead = False
288             self._entries[name] = item.fuse_entry
289         elif isinstance(item, arvados.collection.RichCollectionBase):
290             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item, self.collection_root))
291             self._entries[name].populate(mtime)
292         else:
293             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
294         item.fuse_entry = self._entries[name]
295
296     def on_event(self, event, collection, name, item):
297         # These are events from the Collection object (ADD/DEL/MOD)
298         # emitted by operations on the Collection object (like
299         # "mkdirs" or "remove"), and by "update", which we need to
300         # synchronize with our FUSE objects that are assigned inodes.
301         if collection == self.collection:
302             name = self.sanitize_filename(name)
303
304             #
305             # It's possible for another thread to have llfuse.lock and
306             # be waiting on collection.lock.  Meanwhile, we released
307             # llfuse.lock earlier in the stack, but are still holding
308             # on to the collection lock, and now we need to re-acquire
309             # llfuse.lock.  If we don't release the collection lock,
310             # we'll deadlock where we're holding the collection lock
311             # waiting for llfuse.lock and the other thread is holding
312             # llfuse.lock and waiting for the collection lock.
313             #
314             # The correct locking order here is to take llfuse.lock
315             # first, then the collection lock.
316             #
317             # Since collection.lock is an RLock, it might be locked
318             # multiple times, so we need to release it multiple times,
319             # keep a count, then re-lock it the correct number of
320             # times.
321             #
322             lockcount = 0
323             try:
324                 while True:
325                     self.collection.lock.release()
326                     lockcount += 1
327             except RuntimeError:
328                 pass
329
330             try:
331                 with llfuse.lock:
332                     with self.collection.lock:
333                         if event == arvados.collection.ADD:
334                             self.new_entry(name, item, self.mtime())
335                         elif event == arvados.collection.DEL:
336                             ent = self._entries[name]
337                             del self._entries[name]
338                             self.inodes.invalidate_entry(self, name)
339                             self.inodes.del_entry(ent)
340                         elif event == arvados.collection.MOD:
341                             if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
342                                 self.inodes.invalidate_inode(item.fuse_entry)
343                             elif name in self._entries:
344                                 self.inodes.invalidate_inode(self._entries[name])
345
346                         if self.collection_record_file is not None:
347                             self.collection_record_file.invalidate()
348                             self.inodes.invalidate_inode(self.collection_record_file)
349             finally:
350                 while lockcount > 0:
351                     self.collection.lock.acquire()
352                     lockcount -= 1
353
354     def populate(self, mtime):
355         self._mtime = mtime
356         with self.collection.lock:
357             self.collection.subscribe(self.on_event)
358             for entry, item in self.collection.items():
359                 self.new_entry(entry, item, self.mtime())
360
361     def writable(self):
362         return self._enable_write and self.collection.writable()
363
364     @use_counter
365     def flush(self):
366         self.collection_root.flush()
367
368     @use_counter
369     @check_update
370     def create(self, name):
371         if not self.writable():
372             raise llfuse.FUSEError(errno.EROFS)
373         with llfuse.lock_released:
374             self.collection.open(name, "w").close()
375
376     @use_counter
377     @check_update
378     def mkdir(self, name):
379         if not self.writable():
380             raise llfuse.FUSEError(errno.EROFS)
381         with llfuse.lock_released:
382             self.collection.mkdirs(name)
383
384     @use_counter
385     @check_update
386     def unlink(self, name):
387         if not self.writable():
388             raise llfuse.FUSEError(errno.EROFS)
389         with llfuse.lock_released:
390             self.collection.remove(name)
391         self.flush()
392
393     @use_counter
394     @check_update
395     def rmdir(self, name):
396         if not self.writable():
397             raise llfuse.FUSEError(errno.EROFS)
398         with llfuse.lock_released:
399             self.collection.remove(name)
400         self.flush()
401
402     @use_counter
403     @check_update
404     def rename(self, name_old, name_new, src):
405         if not self.writable():
406             raise llfuse.FUSEError(errno.EROFS)
407
408         if not isinstance(src, CollectionDirectoryBase):
409             raise llfuse.FUSEError(errno.EPERM)
410
411         if name_new in self:
412             ent = src[name_old]
413             tgt = self[name_new]
414             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
415                 pass
416             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
417                 if len(tgt) > 0:
418                     raise llfuse.FUSEError(errno.ENOTEMPTY)
419             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
420                 raise llfuse.FUSEError(errno.ENOTDIR)
421             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
422                 raise llfuse.FUSEError(errno.EISDIR)
423
424         with llfuse.lock_released:
425             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
426         self.flush()
427         src.flush()
428
429     def clear(self):
430         super(CollectionDirectoryBase, self).clear()
431         self.collection = None
432
433
434 class CollectionDirectory(CollectionDirectoryBase):
435     """Represents the root of a directory tree representing a collection."""
436
437     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, collection_record=None, explicit_collection=None):
438         super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None, self)
439         self.api = api
440         self.num_retries = num_retries
441         self._poll = True
442         try:
443             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
444         except:
445             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
446             self._poll_time = 60*60
447
448         if isinstance(collection_record, dict):
449             self.collection_locator = collection_record['uuid']
450             self._mtime = convertTime(collection_record.get('modified_at'))
451         else:
452             self.collection_locator = collection_record
453             self._mtime = 0
454         self._manifest_size = 0
455         if self.collection_locator:
456             self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
457         self._updating_lock = threading.Lock()
458
459     def same(self, i):
460         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
461
462     def writable(self):
463         return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
464
465     @use_counter
466     def flush(self):
467         if not self.writable():
468             return
469         with llfuse.lock_released:
470             with self._updating_lock:
471                 if self.collection.committed():
472                     self.collection.update()
473                 else:
474                     self.collection.save()
475                 self.new_collection_record(self.collection.api_response())
476
477     def want_event_subscribe(self):
478         return (uuid_pattern.match(self.collection_locator) is not None)
479
480     def new_collection(self, new_collection_record, coll_reader):
481         if self.inode:
482             self.clear()
483         self.collection = coll_reader
484         self.new_collection_record(new_collection_record)
485         self.populate(self.mtime())
486
487     def new_collection_record(self, new_collection_record):
488         if not new_collection_record:
489             raise Exception("invalid new_collection_record")
490         self._mtime = convertTime(new_collection_record.get('modified_at'))
491         self._manifest_size = len(new_collection_record["manifest_text"])
492         self.collection_locator = new_collection_record["uuid"]
493         if self.collection_record_file is not None:
494             self.collection_record_file.invalidate()
495             self.inodes.invalidate_inode(self.collection_record_file)
496             _logger.debug("%s invalidated collection record file", self)
497         self.fresh()
498
499     def uuid(self):
500         return self.collection_locator
501
502     @use_counter
503     def update(self):
504         try:
505             if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator):
506                 # It's immutable, nothing to update
507                 return True
508
509             if self.collection_locator is None:
510                 # No collection locator to retrieve from
511                 self.fresh()
512                 return True
513
514             new_collection_record = None
515             try:
516                 with llfuse.lock_released:
517                     self._updating_lock.acquire()
518                     if not self.stale():
519                         return True
520
521                     _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
522                     coll_reader = None
523                     if self.collection is not None:
524                         # Already have a collection object
525                         self.collection.update()
526                         new_collection_record = self.collection.api_response()
527                     else:
528                         # too much prefetch and you end up stepping on your own transfers
529                         # experimentally the optimal somewhere between 4 and 6
530                         get_threads = min(max((self.api.keep.block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 6)
531                         # Create a new collection object
532                         if uuid_pattern.match(self.collection_locator):
533                             coll_reader = arvados.collection.Collection(
534                                 self.collection_locator, self.api, self.api.keep,
535                                 num_retries=self.num_retries,
536                                 get_threads=get_threads)
537                         else:
538                             coll_reader = arvados.collection.CollectionReader(
539                                 self.collection_locator, self.api, self.api.keep,
540                                 num_retries=self.num_retries,
541                                 get_threads=get_threads)
542                         new_collection_record = coll_reader.api_response() or {}
543                         # If the Collection only exists in Keep, there will be no API
544                         # response.  Fill in the fields we need.
545                         if 'uuid' not in new_collection_record:
546                             new_collection_record['uuid'] = self.collection_locator
547                         if "portable_data_hash" not in new_collection_record:
548                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
549                         if 'manifest_text' not in new_collection_record:
550                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
551                         if 'storage_classes_desired' not in new_collection_record:
552                             new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
553
554                 # end with llfuse.lock_released, re-acquire lock
555
556                 if new_collection_record is not None:
557                     if coll_reader is not None:
558                         self.new_collection(new_collection_record, coll_reader)
559                     else:
560                         self.new_collection_record(new_collection_record)
561
562                 return True
563             finally:
564                 self._updating_lock.release()
565         except arvados.errors.NotFoundError as e:
566             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
567         except arvados.errors.ArgumentError as detail:
568             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
569             if new_collection_record is not None and "manifest_text" in new_collection_record:
570                 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
571         except Exception:
572             _logger.exception("arv-mount %s: error", self.collection_locator)
573             if new_collection_record is not None and "manifest_text" in new_collection_record:
574                 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
575         self.invalidate()
576         return False
577
578     @use_counter
579     def collection_record(self):
580         self.flush()
581         return self.collection.api_response()
582
583     @use_counter
584     @check_update
585     def __getitem__(self, item):
586         if item == '.arvados#collection':
587             if self.collection_record_file is None:
588                 self.collection_record_file = FuncToJSONFile(
589                     self.inode, self.collection_record)
590                 self.inodes.add_entry(self.collection_record_file)
591             self.invalidate()  # use lookup as a signal to force update
592             return self.collection_record_file
593         else:
594             return super(CollectionDirectory, self).__getitem__(item)
595
596     def __contains__(self, k):
597         if k == '.arvados#collection':
598             return True
599         else:
600             return super(CollectionDirectory, self).__contains__(k)
601
602     def invalidate(self):
603         if self.collection_record_file is not None:
604             self.collection_record_file.invalidate()
605             self.inodes.invalidate_inode(self.collection_record_file)
606         super(CollectionDirectory, self).invalidate()
607
608     def persisted(self):
609         return (self.collection_locator is not None)
610
611     def objsize(self):
612         # This is an empirically-derived heuristic to estimate the memory used
613         # to store this collection's metadata.  Calculating the memory
614         # footprint directly would be more accurate, but also more complicated.
615         return self._manifest_size * 128
616
617     def finalize(self):
618         if self.collection is not None:
619             if self.writable():
620                 self.collection.save()
621             self.collection.stop_threads()
622
623     def clear(self):
624         if self.collection is not None:
625             self.collection.stop_threads()
626         super(CollectionDirectory, self).clear()
627         self._manifest_size = 0
628
629
630 class TmpCollectionDirectory(CollectionDirectoryBase):
631     """A directory backed by an Arvados collection that never gets saved.
632
633     This supports using Keep as scratch space. A userspace program can
634     read the .arvados#collection file to get a current manifest in
635     order to save a snapshot of the scratch data or use it as a crunch
636     job output.
637     """
638
639     class UnsaveableCollection(arvados.collection.Collection):
640         def save(self):
641             pass
642         def save_new(self):
643             pass
644
645     def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, storage_classes=None):
646         collection = self.UnsaveableCollection(
647             api_client=api_client,
648             keep_client=api_client.keep,
649             num_retries=num_retries,
650             storage_classes_desired=storage_classes)
651         # This is always enable_write=True because it never tries to
652         # save to the backend
653         super(TmpCollectionDirectory, self).__init__(
654             parent_inode, inodes, api_client.config, True, collection, self)
655         self.populate(self.mtime())
656
657     def on_event(self, *args, **kwargs):
658         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
659         if self.collection_record_file is None:
660             return
661
662         # See discussion in CollectionDirectoryBase.on_event
663         lockcount = 0
664         try:
665             while True:
666                 self.collection.lock.release()
667                 lockcount += 1
668         except RuntimeError:
669             pass
670
671         try:
672             with llfuse.lock:
673                 with self.collection.lock:
674                     self.collection_record_file.invalidate()
675                     self.inodes.invalidate_inode(self.collection_record_file)
676                     _logger.debug("%s invalidated collection record", self)
677         finally:
678             while lockcount > 0:
679                 self.collection.lock.acquire()
680                 lockcount -= 1
681
682     def collection_record(self):
683         with llfuse.lock_released:
684             return {
685                 "uuid": None,
686                 "manifest_text": self.collection.manifest_text(),
687                 "portable_data_hash": self.collection.portable_data_hash(),
688                 "storage_classes_desired": self.collection.storage_classes_desired(),
689             }
690
691     def __contains__(self, k):
692         return (k == '.arvados#collection' or
693                 super(TmpCollectionDirectory, self).__contains__(k))
694
695     @use_counter
696     def __getitem__(self, item):
697         if item == '.arvados#collection':
698             if self.collection_record_file is None:
699                 self.collection_record_file = FuncToJSONFile(
700                     self.inode, self.collection_record)
701                 self.inodes.add_entry(self.collection_record_file)
702             return self.collection_record_file
703         return super(TmpCollectionDirectory, self).__getitem__(item)
704
705     def persisted(self):
706         return False
707
708     def writable(self):
709         return True
710
711     def flush(self):
712         pass
713
714     def want_event_subscribe(self):
715         return False
716
717     def finalize(self):
718         self.collection.stop_threads()
719
720     def invalidate(self):
721         if self.collection_record_file:
722             self.collection_record_file.invalidate()
723         super(TmpCollectionDirectory, self).invalidate()
724
725
726 class MagicDirectory(Directory):
727     """A special directory that logically contains the set of all extant keep locators.
728
729     When a file is referenced by lookup(), it is tested to see if it is a valid
730     keep locator to a manifest, and if so, loads the manifest contents as a
731     subdirectory of this directory with the locator as the directory name.
732     Since querying a list of all extant keep locators is impractical, only
733     collections that have already been accessed are visible to readdir().
734
735     """
736
737     README_TEXT = """
738 This directory provides access to Arvados collections as subdirectories listed
739 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
740 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
741 (in the form 'zzzzz-j7d0g-1234567890abcde').
742
743 Note that this directory will appear empty until you attempt to access a
744 specific collection or project subdirectory (such as trying to 'cd' into it),
745 at which point the collection or project will actually be looked up on the server
746 and the directory will appear if it exists.
747
748 """.lstrip()
749
750     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, pdh_only=False, storage_classes=None):
751         super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
752         self.api = api
753         self.num_retries = num_retries
754         self.pdh_only = pdh_only
755         self.storage_classes = storage_classes
756
757     def __setattr__(self, name, value):
758         super(MagicDirectory, self).__setattr__(name, value)
759         # When we're assigned an inode, add a README.
760         if ((name == 'inode') and (self.inode is not None) and
761               (not self._entries)):
762             self._entries['README'] = self.inodes.add_entry(
763                 StringFile(self.inode, self.README_TEXT, time.time()))
764             # If we're the root directory, add an identical by_id subdirectory.
765             if self.inode == llfuse.ROOT_INODE:
766                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
767                     self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
768                     self.pdh_only))
769
770     def __contains__(self, k):
771         if k in self._entries:
772             return True
773
774         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
775             return False
776
777         try:
778             e = None
779
780             if group_uuid_pattern.match(k):
781                 project = self.api.groups().list(
782                     filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
783                 if project[u'items_available'] == 0:
784                     return False
785                 e = self.inodes.add_entry(ProjectDirectory(
786                     self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
787                     project[u'items'][0], storage_classes=self.storage_classes))
788             else:
789                 e = self.inodes.add_entry(CollectionDirectory(
790                         self.inode, self.inodes, self.api, self.num_retries, self._enable_write, k))
791
792             if e.update():
793                 if k not in self._entries:
794                     self._entries[k] = e
795                 else:
796                     self.inodes.del_entry(e)
797                 return True
798             else:
799                 self.inodes.invalidate_entry(self, k)
800                 self.inodes.del_entry(e)
801                 return False
802         except Exception as ex:
803             _logger.exception("arv-mount lookup '%s':", k)
804             if e is not None:
805                 self.inodes.del_entry(e)
806             return False
807
808     def __getitem__(self, item):
809         if item in self:
810             return self._entries[item]
811         else:
812             raise KeyError("No collection with id " + item)
813
814     def clear(self):
815         pass
816
817     def want_event_subscribe(self):
818         return not self.pdh_only
819
820
821 class TagsDirectory(Directory):
822     """A special directory that contains as subdirectories all tags visible to the user."""
823
824     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, poll_time=60):
825         super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
826         self.api = api
827         self.num_retries = num_retries
828         self._poll = True
829         self._poll_time = poll_time
830         self._extra = set()
831
832     def want_event_subscribe(self):
833         return True
834
835     @use_counter
836     def update(self):
837         with llfuse.lock_released:
838             tags = self.api.links().list(
839                 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
840                 select=['name'], distinct=True, limit=1000
841                 ).execute(num_retries=self.num_retries)
842         if "items" in tags:
843             self.merge(tags['items']+[{"name": n} for n in self._extra],
844                        lambda i: i['name'],
845                        lambda a, i: a.tag == i['name'],
846                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
847                                               i['name'], poll=self._poll, poll_time=self._poll_time))
848
849     @use_counter
850     @check_update
851     def __getitem__(self, item):
852         if super(TagsDirectory, self).__contains__(item):
853             return super(TagsDirectory, self).__getitem__(item)
854         with llfuse.lock_released:
855             tags = self.api.links().list(
856                 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
857             ).execute(num_retries=self.num_retries)
858         if tags["items"]:
859             self._extra.add(item)
860             self.update()
861         return super(TagsDirectory, self).__getitem__(item)
862
863     @use_counter
864     @check_update
865     def __contains__(self, k):
866         if super(TagsDirectory, self).__contains__(k):
867             return True
868         try:
869             self[k]
870             return True
871         except KeyError:
872             pass
873         return False
874
875
876 class TagDirectory(Directory):
877     """A special directory that contains as subdirectories all collections visible
878     to the user that are tagged with a particular tag.
879     """
880
881     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, tag,
882                  poll=False, poll_time=60):
883         super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
884         self.api = api
885         self.num_retries = num_retries
886         self.tag = tag
887         self._poll = poll
888         self._poll_time = poll_time
889
890     def want_event_subscribe(self):
891         return True
892
893     @use_counter
894     def update(self):
895         with llfuse.lock_released:
896             taggedcollections = self.api.links().list(
897                 filters=[['link_class', '=', 'tag'],
898                          ['name', '=', self.tag],
899                          ['head_uuid', 'is_a', 'arvados#collection']],
900                 select=['head_uuid']
901                 ).execute(num_retries=self.num_retries)
902         self.merge(taggedcollections['items'],
903                    lambda i: i['head_uuid'],
904                    lambda a, i: a.collection_locator == i['head_uuid'],
905                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid']))
906
907
908 class ProjectDirectory(Directory):
909     """A special directory that contains the contents of a project."""
910
911     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, project_object,
912                  poll=True, poll_time=3, storage_classes=None):
913         super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
914         self.api = api
915         self.num_retries = num_retries
916         self.project_object = project_object
917         self.project_object_file = None
918         self.project_uuid = project_object['uuid']
919         self._poll = poll
920         self._poll_time = poll_time
921         self._updating_lock = threading.Lock()
922         self._current_user = None
923         self._full_listing = False
924         self.storage_classes = storage_classes
925
926     def want_event_subscribe(self):
927         return True
928
929     def createDirectory(self, i):
930         if collection_uuid_pattern.match(i['uuid']):
931             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i)
932         elif group_uuid_pattern.match(i['uuid']):
933             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
934                                     i, self._poll, self._poll_time, self.storage_classes)
935         elif link_uuid_pattern.match(i['uuid']):
936             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
937                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid'])
938             else:
939                 return None
940         elif uuid_pattern.match(i['uuid']):
941             return ObjectFile(self.parent_inode, i)
942         else:
943             return None
944
945     def uuid(self):
946         return self.project_uuid
947
948     def items(self):
949         self._full_listing = True
950         return super(ProjectDirectory, self).items()
951
952     def namefn(self, i):
953         if 'name' in i:
954             if i['name'] is None or len(i['name']) == 0:
955                 return None
956             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
957                 # collection or subproject
958                 return i['name']
959             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
960                 # name link
961                 return i['name']
962             elif 'kind' in i and i['kind'].startswith('arvados#'):
963                 # something else
964                 return "{}.{}".format(i['name'], i['kind'][8:])
965         else:
966             return None
967
968
969     @use_counter
970     def update(self):
971         if self.project_object_file == None:
972             self.project_object_file = ObjectFile(self.inode, self.project_object)
973             self.inodes.add_entry(self.project_object_file)
974
975         if not self._full_listing:
976             return True
977
978         def samefn(a, i):
979             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
980                 return a.uuid() == i['uuid']
981             elif isinstance(a, ObjectFile):
982                 return a.uuid() == i['uuid'] and not a.stale()
983             return False
984
985         try:
986             with llfuse.lock_released:
987                 self._updating_lock.acquire()
988                 if not self.stale():
989                     return
990
991                 if group_uuid_pattern.match(self.project_uuid):
992                     self.project_object = self.api.groups().get(
993                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
994                 elif user_uuid_pattern.match(self.project_uuid):
995                     self.project_object = self.api.users().get(
996                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
997                 # do this in 2 steps until #17424 is fixed
998                 contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
999                                                         order_key="uuid",
1000                                                         num_retries=self.num_retries,
1001                                                         uuid=self.project_uuid,
1002                                                         filters=[["uuid", "is_a", "arvados#group"],
1003                                                                  ["groups.group_class", "in", ["project","filter"]]]))
1004                 contents.extend(filter(lambda i: i["current_version_uuid"] == i["uuid"],
1005                                        arvados.util.keyset_list_all(self.api.groups().contents,
1006                                                              order_key="uuid",
1007                                                              num_retries=self.num_retries,
1008                                                              uuid=self.project_uuid,
1009                                                              filters=[["uuid", "is_a", "arvados#collection"]])))
1010
1011
1012             # end with llfuse.lock_released, re-acquire lock
1013
1014             self.merge(contents,
1015                        self.namefn,
1016                        samefn,
1017                        self.createDirectory)
1018             return True
1019         finally:
1020             self._updating_lock.release()
1021
1022     def _add_entry(self, i, name):
1023         ent = self.createDirectory(i)
1024         self._entries[name] = self.inodes.add_entry(ent)
1025         return self._entries[name]
1026
1027     @use_counter
1028     @check_update
1029     def __getitem__(self, k):
1030         if k == '.arvados#project':
1031             return self.project_object_file
1032         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1033             return super(ProjectDirectory, self).__getitem__(k)
1034         with llfuse.lock_released:
1035             k2 = self.unsanitize_filename(k)
1036             if k2 == k:
1037                 namefilter = ["name", "=", k]
1038             else:
1039                 namefilter = ["name", "in", [k, k2]]
1040             contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
1041                                                        ["group_class", "in", ["project","filter"]],
1042                                                        namefilter],
1043                                               limit=2).execute(num_retries=self.num_retries)["items"]
1044             if not contents:
1045                 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
1046                                                                 namefilter],
1047                                                        limit=2).execute(num_retries=self.num_retries)["items"]
1048         if contents:
1049             if len(contents) > 1 and contents[1]['name'] == k:
1050                 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1051                 # "foo[SUBST]bar".
1052                 contents = [contents[1]]
1053             name = self.sanitize_filename(self.namefn(contents[0]))
1054             if name != k:
1055                 raise KeyError(k)
1056             return self._add_entry(contents[0], name)
1057
1058         # Didn't find item
1059         raise KeyError(k)
1060
1061     def __contains__(self, k):
1062         if k == '.arvados#project':
1063             return True
1064         try:
1065             self[k]
1066             return True
1067         except KeyError:
1068             pass
1069         return False
1070
1071     @use_counter
1072     @check_update
1073     def writable(self):
1074         if not self._enable_write:
1075             return False
1076         with llfuse.lock_released:
1077             if not self._current_user:
1078                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1079             return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1080
1081     def persisted(self):
1082         return True
1083
1084     @use_counter
1085     @check_update
1086     def mkdir(self, name):
1087         if not self.writable():
1088             raise llfuse.FUSEError(errno.EROFS)
1089
1090         try:
1091             with llfuse.lock_released:
1092                 c = {
1093                     "owner_uuid": self.project_uuid,
1094                     "name": name,
1095                     "manifest_text": "" }
1096                 if self.storage_classes is not None:
1097                     c["storage_classes_desired"] = self.storage_classes
1098                 try:
1099                     self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1100                 except Exception as e:
1101                     raise
1102             self.invalidate()
1103         except apiclient_errors.Error as error:
1104             _logger.error(error)
1105             raise llfuse.FUSEError(errno.EEXIST)
1106
1107     @use_counter
1108     @check_update
1109     def rmdir(self, name):
1110         if not self.writable():
1111             raise llfuse.FUSEError(errno.EROFS)
1112
1113         if name not in self:
1114             raise llfuse.FUSEError(errno.ENOENT)
1115         if not isinstance(self[name], CollectionDirectory):
1116             raise llfuse.FUSEError(errno.EPERM)
1117         if len(self[name]) > 0:
1118             raise llfuse.FUSEError(errno.ENOTEMPTY)
1119         with llfuse.lock_released:
1120             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1121         self.invalidate()
1122
1123     @use_counter
1124     @check_update
1125     def rename(self, name_old, name_new, src):
1126         if not self.writable():
1127             raise llfuse.FUSEError(errno.EROFS)
1128
1129         if not isinstance(src, ProjectDirectory):
1130             raise llfuse.FUSEError(errno.EPERM)
1131
1132         ent = src[name_old]
1133
1134         if not isinstance(ent, CollectionDirectory):
1135             raise llfuse.FUSEError(errno.EPERM)
1136
1137         if name_new in self:
1138             # POSIX semantics for replacing one directory with another is
1139             # tricky (the target directory must be empty, the operation must be
1140             # atomic which isn't possible with the Arvados API as of this
1141             # writing) so don't support that.
1142             raise llfuse.FUSEError(errno.EPERM)
1143
1144         self.api.collections().update(uuid=ent.uuid(),
1145                                       body={"owner_uuid": self.uuid(),
1146                                             "name": name_new}).execute(num_retries=self.num_retries)
1147
1148         # Acually move the entry from source directory to this directory.
1149         del src._entries[name_old]
1150         self._entries[name_new] = ent
1151         self.inodes.invalidate_entry(src, name_old)
1152
1153     @use_counter
1154     def child_event(self, ev):
1155         properties = ev.get("properties") or {}
1156         old_attrs = properties.get("old_attributes") or {}
1157         new_attrs = properties.get("new_attributes") or {}
1158         old_attrs["uuid"] = ev["object_uuid"]
1159         new_attrs["uuid"] = ev["object_uuid"]
1160         old_name = self.sanitize_filename(self.namefn(old_attrs))
1161         new_name = self.sanitize_filename(self.namefn(new_attrs))
1162
1163         # create events will have a new name, but not an old name
1164         # delete events will have an old name, but not a new name
1165         # update events will have an old and new name, and they may be same or different
1166         # if they are the same, an unrelated field changed and there is nothing to do.
1167
1168         if old_attrs.get("owner_uuid") != self.project_uuid:
1169             # Was moved from somewhere else, so don't try to remove entry.
1170             old_name = None
1171         if ev.get("object_owner_uuid") != self.project_uuid:
1172             # Was moved to somewhere else, so don't try to add entry
1173             new_name = None
1174
1175         if old_attrs.get("is_trashed"):
1176             # Was previously deleted
1177             old_name = None
1178         if new_attrs.get("is_trashed"):
1179             # Has been deleted
1180             new_name = None
1181
1182         if new_name != old_name:
1183             ent = None
1184             if old_name in self._entries:
1185                 ent = self._entries[old_name]
1186                 del self._entries[old_name]
1187                 self.inodes.invalidate_entry(self, old_name)
1188
1189             if new_name:
1190                 if ent is not None:
1191                     self._entries[new_name] = ent
1192                 else:
1193                     self._add_entry(new_attrs, new_name)
1194             elif ent is not None:
1195                 self.inodes.del_entry(ent)
1196
1197
1198 class SharedDirectory(Directory):
1199     """A special directory that represents users or groups who have shared projects with me."""
1200
1201     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, exclude,
1202                  poll=False, poll_time=60, storage_classes=None):
1203         super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
1204         self.api = api
1205         self.num_retries = num_retries
1206         self.current_user = api.users().current().execute(num_retries=num_retries)
1207         self._poll = True
1208         self._poll_time = poll_time
1209         self._updating_lock = threading.Lock()
1210         self.storage_classes = storage_classes
1211
1212     @use_counter
1213     def update(self):
1214         try:
1215             with llfuse.lock_released:
1216                 self._updating_lock.acquire()
1217                 if not self.stale():
1218                     return
1219
1220                 contents = {}
1221                 roots = []
1222                 root_owners = set()
1223                 objects = {}
1224
1225                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1226                 if 'httpMethod' in methods.get('shared', {}):
1227                     page = []
1228                     while True:
1229                         resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
1230                                                         order="uuid",
1231                                                         limit=10000,
1232                                                         count="none",
1233                                                         include="owner_uuid").execute()
1234                         if not resp["items"]:
1235                             break
1236                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1237                         for r in resp["items"]:
1238                             objects[r["uuid"]] = r
1239                             roots.append(r["uuid"])
1240                         for r in resp["included"]:
1241                             objects[r["uuid"]] = r
1242                             root_owners.add(r["uuid"])
1243                 else:
1244                     all_projects = list(arvados.util.keyset_list_all(
1245                         self.api.groups().list,
1246                         order_key="uuid",
1247                         num_retries=self.num_retries,
1248                         filters=[['group_class','in',['project','filter']]],
1249                         select=["uuid", "owner_uuid"]))
1250                     for ob in all_projects:
1251                         objects[ob['uuid']] = ob
1252
1253                     current_uuid = self.current_user['uuid']
1254                     for ob in all_projects:
1255                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1256                             roots.append(ob['uuid'])
1257                             root_owners.add(ob['owner_uuid'])
1258
1259                     lusers = arvados.util.keyset_list_all(
1260                         self.api.users().list,
1261                         order_key="uuid",
1262                         num_retries=self.num_retries,
1263                         filters=[['uuid','in', list(root_owners)]])
1264                     lgroups = arvados.util.keyset_list_all(
1265                         self.api.groups().list,
1266                         order_key="uuid",
1267                         num_retries=self.num_retries,
1268                         filters=[['uuid','in', list(root_owners)+roots]])
1269
1270                     for l in lusers:
1271                         objects[l["uuid"]] = l
1272                     for l in lgroups:
1273                         objects[l["uuid"]] = l
1274
1275                 for r in root_owners:
1276                     if r in objects:
1277                         obr = objects[r]
1278                         if obr.get("name"):
1279                             contents[obr["name"]] = obr
1280                         elif "first_name" in obr:
1281                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1282
1283                 for r in roots:
1284                     if r in objects:
1285                         obr = objects[r]
1286                         if obr['owner_uuid'] not in objects:
1287                             contents[obr["name"]] = obr
1288
1289             # end with llfuse.lock_released, re-acquire lock
1290
1291             self.merge(contents.items(),
1292                        lambda i: i[0],
1293                        lambda a, i: a.uuid() == i[1]['uuid'],
1294                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
1295                                                   i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
1296         except Exception:
1297             _logger.exception("arv-mount shared dir error")
1298         finally:
1299             self._updating_lock.release()
1300
1301     def want_event_subscribe(self):
1302         return True