7de95a0cb1b0d95bd1d67dcc58b5a3c406a863ff
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import apiclient
6 import arvados
7 import errno
8 import functools
9 import llfuse
10 import logging
11 import re
12 import sys
13 import threading
14 import time
15 from apiclient import errors as apiclient_errors
16
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
19
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
22
23 _logger = logging.getLogger('arvados.arvados_fuse')
24
25
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile('[\x00/]')
30
31
32 class Directory(FreshBase):
33     """Generic directory object, backed by a dict.
34
35     Consists of a set of entries with the key representing the filename
36     and the value referencing a File or Directory object.
37     """
38
39     def __init__(self, parent_inode, inodes, apiconfig, enable_write):
40         """parent_inode is the integer inode number"""
41
42         super(Directory, self).__init__()
43
44         self.inode = None
45         if not isinstance(parent_inode, int):
46             raise Exception("parent_inode should be an int")
47         self.parent_inode = parent_inode
48         self.inodes = inodes
49         self.apiconfig = apiconfig
50         self._entries = {}
51         self._mtime = time.time()
52         self._enable_write = enable_write
53
54     def forward_slash_subst(self):
55         if not hasattr(self, '_fsns'):
56             self._fsns = None
57             config = self.apiconfig()
58             try:
59                 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
60             except KeyError:
61                 # old API server with no FSNS config
62                 self._fsns = '_'
63             else:
64                 if self._fsns == '' or self._fsns == '/':
65                     self._fsns = None
66         return self._fsns
67
68     def unsanitize_filename(self, incoming):
69         """Replace ForwardSlashNameSubstitution value with /"""
70         fsns = self.forward_slash_subst()
71         if isinstance(fsns, str):
72             return incoming.replace(fsns, '/')
73         else:
74             return incoming
75
76     def sanitize_filename(self, dirty):
77         """Replace disallowed filename characters according to
78         ForwardSlashNameSubstitution in self.api_config."""
79         # '.' and '..' are not reachable if API server is newer than #6277
80         if dirty is None:
81             return None
82         elif dirty == '':
83             return '_'
84         elif dirty == '.':
85             return '_'
86         elif dirty == '..':
87             return '__'
88         else:
89             fsns = self.forward_slash_subst()
90             if isinstance(fsns, str):
91                 dirty = dirty.replace('/', fsns)
92             return _disallowed_filename_characters.sub('_', dirty)
93
94
95     #  Overridden by subclasses to implement logic to update the
96     #  entries dict when the directory is stale
97     @use_counter
98     def update(self):
99         pass
100
101     # Only used when computing the size of the disk footprint of the directory
102     # (stub)
103     def size(self):
104         return 0
105
106     def persisted(self):
107         return False
108
109     def checkupdate(self):
110         if self.stale():
111             try:
112                 self.update()
113             except apiclient.errors.HttpError as e:
114                 _logger.warn(e)
115
116     @use_counter
117     @check_update
118     def __getitem__(self, item):
119         return self._entries[item]
120
121     @use_counter
122     @check_update
123     def items(self):
124         return list(self._entries.items())
125
126     @use_counter
127     @check_update
128     def __contains__(self, k):
129         return k in self._entries
130
131     @use_counter
132     @check_update
133     def __len__(self):
134         return len(self._entries)
135
136     def fresh(self):
137         self.inodes.touch(self)
138         super(Directory, self).fresh()
139
140     def merge(self, items, fn, same, new_entry):
141         """Helper method for updating the contents of the directory.
142
143         Takes a list describing the new contents of the directory, reuse
144         entries that are the same in both the old and new lists, create new
145         entries, and delete old entries missing from the new list.
146
147         :items: iterable with new directory contents
148
149         :fn: function to take an entry in 'items' and return the desired file or
150         directory name, or None if this entry should be skipped
151
152         :same: function to compare an existing entry (a File or Directory
153         object) with an entry in the items list to determine whether to keep
154         the existing entry.
155
156         :new_entry: function to create a new directory entry (File or Directory
157         object) from an entry in the items list.
158
159         """
160
161         oldentries = self._entries
162         self._entries = {}
163         changed = False
164         for i in items:
165             name = self.sanitize_filename(fn(i))
166             if name:
167                 if name in oldentries and same(oldentries[name], i):
168                     # move existing directory entry over
169                     self._entries[name] = oldentries[name]
170                     del oldentries[name]
171                 else:
172                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
173                     # create new directory entry
174                     ent = new_entry(i)
175                     if ent is not None:
176                         self._entries[name] = self.inodes.add_entry(ent)
177                         changed = True
178
179         # delete any other directory entries that were not in found in 'items'
180         for i in oldentries:
181             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
182             self.inodes.invalidate_entry(self, i)
183             self.inodes.del_entry(oldentries[i])
184             changed = True
185
186         if changed:
187             self.inodes.invalidate_inode(self)
188             self._mtime = time.time()
189
190         self.fresh()
191
192     def in_use(self):
193         if super(Directory, self).in_use():
194             return True
195         for v in self._entries.values():
196             if v.in_use():
197                 return True
198         return False
199
200     def has_ref(self, only_children):
201         if super(Directory, self).has_ref(only_children):
202             return True
203         for v in self._entries.values():
204             if v.has_ref(False):
205                 return True
206         return False
207
208     def clear(self):
209         """Delete all entries"""
210         oldentries = self._entries
211         self._entries = {}
212         for n in oldentries:
213             oldentries[n].clear()
214             self.inodes.del_entry(oldentries[n])
215         self.invalidate()
216
217     def kernel_invalidate(self):
218         # Invalidating the dentry on the parent implies invalidating all paths
219         # below it as well.
220         parent = self.inodes[self.parent_inode]
221
222         # Find self on the parent in order to invalidate this path.
223         # Calling the public items() method might trigger a refresh,
224         # which we definitely don't want, so read the internal dict directly.
225         for k,v in parent._entries.items():
226             if v is self:
227                 self.inodes.invalidate_entry(parent, k)
228                 break
229
230     def mtime(self):
231         return self._mtime
232
233     def writable(self):
234         return False
235
236     def flush(self):
237         pass
238
239     def want_event_subscribe(self):
240         raise NotImplementedError()
241
242     def create(self, name):
243         raise NotImplementedError()
244
245     def mkdir(self, name):
246         raise NotImplementedError()
247
248     def unlink(self, name):
249         raise NotImplementedError()
250
251     def rmdir(self, name):
252         raise NotImplementedError()
253
254     def rename(self, name_old, name_new, src):
255         raise NotImplementedError()
256
257
258 class CollectionDirectoryBase(Directory):
259     """Represent an Arvados Collection as a directory.
260
261     This class is used for Subcollections, and is also the base class for
262     CollectionDirectory, which implements collection loading/saving on
263     Collection records.
264
265     Most operations act only the underlying Arvados `Collection` object.  The
266     `Collection` object signals via a notify callback to
267     `CollectionDirectoryBase.on_event` that an item was added, removed or
268     modified.  FUSE inodes and directory entries are created, deleted or
269     invalidated in response to these events.
270
271     """
272
273     def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection, collection_root):
274         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write)
275         self.apiconfig = apiconfig
276         self.collection = collection
277         self.collection_root = collection_root
278         self.collection_record_file = None
279
280     def new_entry(self, name, item, mtime):
281         name = self.sanitize_filename(name)
282         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
283             if item.fuse_entry.dead is not True:
284                 raise Exception("Can only reparent dead inode entry")
285             if item.fuse_entry.inode is None:
286                 raise Exception("Reparented entry must still have valid inode")
287             item.fuse_entry.dead = False
288             self._entries[name] = item.fuse_entry
289         elif isinstance(item, arvados.collection.RichCollectionBase):
290             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item, self.collection_root))
291             self._entries[name].populate(mtime)
292         else:
293             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
294         item.fuse_entry = self._entries[name]
295
296     def on_event(self, event, collection, name, item):
297         # These are events from the Collection object (ADD/DEL/MOD)
298         # emitted by operations on the Collection object (like
299         # "mkdirs" or "remove"), and by "update", which we need to
300         # synchronize with our FUSE objects that are assigned inodes.
301         if collection == self.collection:
302             name = self.sanitize_filename(name)
303
304             #
305             # It's possible for another thread to have llfuse.lock and
306             # be waiting on collection.lock.  Meanwhile, we released
307             # llfuse.lock earlier in the stack, but are still holding
308             # on to the collection lock, and now we need to re-acquire
309             # llfuse.lock.  If we don't release the collection lock,
310             # we'll deadlock where we're holding the collection lock
311             # waiting for llfuse.lock and the other thread is holding
312             # llfuse.lock and waiting for the collection lock.
313             #
314             # The correct locking order here is to take llfuse.lock
315             # first, then the collection lock.
316             #
317             # Since collection.lock is an RLock, it might be locked
318             # multiple times, so we need to release it multiple times,
319             # keep a count, then re-lock it the correct number of
320             # times.
321             #
322             lockcount = 0
323             try:
324                 while True:
325                     self.collection.lock.release()
326                     lockcount += 1
327             except RuntimeError:
328                 pass
329
330             try:
331                 with llfuse.lock:
332                     with self.collection.lock:
333                         if event == arvados.collection.ADD:
334                             self.new_entry(name, item, self.mtime())
335                         elif event == arvados.collection.DEL:
336                             ent = self._entries[name]
337                             del self._entries[name]
338                             self.inodes.invalidate_entry(self, name)
339                             self.inodes.del_entry(ent)
340                         elif event == arvados.collection.MOD:
341                             if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
342                                 self.inodes.invalidate_inode(item.fuse_entry)
343                             elif name in self._entries:
344                                 self.inodes.invalidate_inode(self._entries[name])
345
346                         if self.collection_record_file is not None:
347                             self.collection_record_file.invalidate()
348                             self.inodes.invalidate_inode(self.collection_record_file)
349             finally:
350                 while lockcount > 0:
351                     self.collection.lock.acquire()
352                     lockcount -= 1
353
354     def populate(self, mtime):
355         self._mtime = mtime
356         with self.collection.lock:
357             self.collection.subscribe(self.on_event)
358             for entry, item in self.collection.items():
359                 self.new_entry(entry, item, self.mtime())
360
361     def writable(self):
362         return self._enable_write and self.collection.writable()
363
364     @use_counter
365     def flush(self):
366         self.collection_root.flush()
367
368     @use_counter
369     @check_update
370     def create(self, name):
371         if not self.writable():
372             raise llfuse.FUSEError(errno.EROFS)
373         with llfuse.lock_released:
374             self.collection.open(name, "w").close()
375
376     @use_counter
377     @check_update
378     def mkdir(self, name):
379         if not self.writable():
380             raise llfuse.FUSEError(errno.EROFS)
381         with llfuse.lock_released:
382             self.collection.mkdirs(name)
383
384     @use_counter
385     @check_update
386     def unlink(self, name):
387         if not self.writable():
388             raise llfuse.FUSEError(errno.EROFS)
389         with llfuse.lock_released:
390             self.collection.remove(name)
391         self.flush()
392
393     @use_counter
394     @check_update
395     def rmdir(self, name):
396         if not self.writable():
397             raise llfuse.FUSEError(errno.EROFS)
398         with llfuse.lock_released:
399             self.collection.remove(name)
400         self.flush()
401
402     @use_counter
403     @check_update
404     def rename(self, name_old, name_new, src):
405         if not self.writable():
406             raise llfuse.FUSEError(errno.EROFS)
407
408         if not isinstance(src, CollectionDirectoryBase):
409             raise llfuse.FUSEError(errno.EPERM)
410
411         if name_new in self:
412             ent = src[name_old]
413             tgt = self[name_new]
414             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
415                 pass
416             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
417                 if len(tgt) > 0:
418                     raise llfuse.FUSEError(errno.ENOTEMPTY)
419             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
420                 raise llfuse.FUSEError(errno.ENOTDIR)
421             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
422                 raise llfuse.FUSEError(errno.EISDIR)
423
424         with llfuse.lock_released:
425             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
426         self.flush()
427         src.flush()
428
429     def clear(self):
430         super(CollectionDirectoryBase, self).clear()
431         self.collection = None
432
433
434 class CollectionDirectory(CollectionDirectoryBase):
435     """Represents the root of a directory tree representing a collection."""
436
437     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, collection_record=None, explicit_collection=None):
438         super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None, self)
439         self.api = api
440         self.num_retries = num_retries
441         self._poll = True
442         try:
443             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
444         except:
445             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
446             self._poll_time = 60*60
447
448         if isinstance(collection_record, dict):
449             self.collection_locator = collection_record['uuid']
450             self._mtime = convertTime(collection_record.get('modified_at'))
451         else:
452             self.collection_locator = collection_record
453             self._mtime = 0
454         self._manifest_size = 0
455         if self.collection_locator:
456             self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
457         self._updating_lock = threading.Lock()
458
459     def same(self, i):
460         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
461
462     def writable(self):
463         return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
464
465     @use_counter
466     def flush(self):
467         if not self.writable():
468             return
469         with llfuse.lock_released:
470             with self._updating_lock:
471                 if self.collection.committed():
472                     self.collection.update()
473                 else:
474                     self.collection.save()
475                 self.new_collection_record(self.collection.api_response())
476
477     def want_event_subscribe(self):
478         return (uuid_pattern.match(self.collection_locator) is not None)
479
480     def new_collection(self, new_collection_record, coll_reader):
481         if self.inode:
482             self.clear()
483         self.collection = coll_reader
484         self.new_collection_record(new_collection_record)
485         self.populate(self.mtime())
486
487     def new_collection_record(self, new_collection_record):
488         if not new_collection_record:
489             raise Exception("invalid new_collection_record")
490         self._mtime = convertTime(new_collection_record.get('modified_at'))
491         self._manifest_size = len(new_collection_record["manifest_text"])
492         self.collection_locator = new_collection_record["uuid"]
493         if self.collection_record_file is not None:
494             self.collection_record_file.invalidate()
495             self.inodes.invalidate_inode(self.collection_record_file)
496             _logger.debug("%s invalidated collection record file", self)
497         self.fresh()
498
499     def uuid(self):
500         return self.collection_locator
501
502     @use_counter
503     def update(self):
504         try:
505             if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator):
506                 # It's immutable, nothing to update
507                 return True
508
509             if self.collection_locator is None:
510                 # No collection locator to retrieve from
511                 self.fresh()
512                 return True
513
514             new_collection_record = None
515             try:
516                 with llfuse.lock_released:
517                     self._updating_lock.acquire()
518                     if not self.stale():
519                         return True
520
521                     _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
522                     coll_reader = None
523                     if self.collection is not None:
524                         # Already have a collection object
525                         self.collection.update()
526                         new_collection_record = self.collection.api_response()
527                     else:
528                         # Create a new collection object
529                         if uuid_pattern.match(self.collection_locator):
530                             coll_reader = arvados.collection.Collection(
531                                 self.collection_locator, self.api, self.api.keep,
532                                 num_retries=self.num_retries)
533                         else:
534                             coll_reader = arvados.collection.CollectionReader(
535                                 self.collection_locator, self.api, self.api.keep,
536                                 num_retries=self.num_retries)
537                         new_collection_record = coll_reader.api_response() or {}
538                         # If the Collection only exists in Keep, there will be no API
539                         # response.  Fill in the fields we need.
540                         if 'uuid' not in new_collection_record:
541                             new_collection_record['uuid'] = self.collection_locator
542                         if "portable_data_hash" not in new_collection_record:
543                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
544                         if 'manifest_text' not in new_collection_record:
545                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
546                         if 'storage_classes_desired' not in new_collection_record:
547                             new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
548
549                 # end with llfuse.lock_released, re-acquire lock
550
551                 if new_collection_record is not None:
552                     if coll_reader is not None:
553                         self.new_collection(new_collection_record, coll_reader)
554                     else:
555                         self.new_collection_record(new_collection_record)
556
557                 return True
558             finally:
559                 self._updating_lock.release()
560         except arvados.errors.NotFoundError as e:
561             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
562         except arvados.errors.ArgumentError as detail:
563             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
564             if new_collection_record is not None and "manifest_text" in new_collection_record:
565                 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
566         except Exception:
567             _logger.exception("arv-mount %s: error", self.collection_locator)
568             if new_collection_record is not None and "manifest_text" in new_collection_record:
569                 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
570         self.invalidate()
571         return False
572
573     @use_counter
574     def collection_record(self):
575         self.flush()
576         return self.collection.api_response()
577
578     @use_counter
579     @check_update
580     def __getitem__(self, item):
581         if item == '.arvados#collection':
582             if self.collection_record_file is None:
583                 self.collection_record_file = FuncToJSONFile(
584                     self.inode, self.collection_record)
585                 self.inodes.add_entry(self.collection_record_file)
586             self.invalidate()  # use lookup as a signal to force update
587             return self.collection_record_file
588         else:
589             return super(CollectionDirectory, self).__getitem__(item)
590
591     def __contains__(self, k):
592         if k == '.arvados#collection':
593             return True
594         else:
595             return super(CollectionDirectory, self).__contains__(k)
596
597     def invalidate(self):
598         if self.collection_record_file is not None:
599             self.collection_record_file.invalidate()
600             self.inodes.invalidate_inode(self.collection_record_file)
601         super(CollectionDirectory, self).invalidate()
602
603     def persisted(self):
604         return (self.collection_locator is not None)
605
606     def objsize(self):
607         # This is an empirically-derived heuristic to estimate the memory used
608         # to store this collection's metadata.  Calculating the memory
609         # footprint directly would be more accurate, but also more complicated.
610         return self._manifest_size * 128
611
612     def finalize(self):
613         if self.collection is not None:
614             if self.writable():
615                 self.collection.save()
616             self.collection.stop_threads()
617
618     def clear(self):
619         if self.collection is not None:
620             self.collection.stop_threads()
621         super(CollectionDirectory, self).clear()
622         self._manifest_size = 0
623
624
625 class TmpCollectionDirectory(CollectionDirectoryBase):
626     """A directory backed by an Arvados collection that never gets saved.
627
628     This supports using Keep as scratch space. A userspace program can
629     read the .arvados#collection file to get a current manifest in
630     order to save a snapshot of the scratch data or use it as a crunch
631     job output.
632     """
633
634     class UnsaveableCollection(arvados.collection.Collection):
635         def save(self):
636             pass
637         def save_new(self):
638             pass
639
640     def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, storage_classes=None):
641         collection = self.UnsaveableCollection(
642             api_client=api_client,
643             keep_client=api_client.keep,
644             num_retries=num_retries,
645             storage_classes_desired=storage_classes)
646         # This is always enable_write=True because it never tries to
647         # save to the backend
648         super(TmpCollectionDirectory, self).__init__(
649             parent_inode, inodes, api_client.config, True, collection, self)
650         self.populate(self.mtime())
651
652     def on_event(self, *args, **kwargs):
653         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
654         if self.collection_record_file is None:
655             return
656
657         # See discussion in CollectionDirectoryBase.on_event
658         lockcount = 0
659         try:
660             while True:
661                 self.collection.lock.release()
662                 lockcount += 1
663         except RuntimeError:
664             pass
665
666         try:
667             with llfuse.lock:
668                 with self.collection.lock:
669                     self.collection_record_file.invalidate()
670                     self.inodes.invalidate_inode(self.collection_record_file)
671                     _logger.debug("%s invalidated collection record", self)
672         finally:
673             while lockcount > 0:
674                 self.collection.lock.acquire()
675                 lockcount -= 1
676
677     def collection_record(self):
678         with llfuse.lock_released:
679             return {
680                 "uuid": None,
681                 "manifest_text": self.collection.manifest_text(),
682                 "portable_data_hash": self.collection.portable_data_hash(),
683                 "storage_classes_desired": self.collection.storage_classes_desired(),
684             }
685
686     def __contains__(self, k):
687         return (k == '.arvados#collection' or
688                 super(TmpCollectionDirectory, self).__contains__(k))
689
690     @use_counter
691     def __getitem__(self, item):
692         if item == '.arvados#collection':
693             if self.collection_record_file is None:
694                 self.collection_record_file = FuncToJSONFile(
695                     self.inode, self.collection_record)
696                 self.inodes.add_entry(self.collection_record_file)
697             return self.collection_record_file
698         return super(TmpCollectionDirectory, self).__getitem__(item)
699
700     def persisted(self):
701         return False
702
703     def writable(self):
704         return True
705
706     def flush(self):
707         pass
708
709     def want_event_subscribe(self):
710         return False
711
712     def finalize(self):
713         self.collection.stop_threads()
714
715     def invalidate(self):
716         if self.collection_record_file:
717             self.collection_record_file.invalidate()
718         super(TmpCollectionDirectory, self).invalidate()
719
720
721 class MagicDirectory(Directory):
722     """A special directory that logically contains the set of all extant keep locators.
723
724     When a file is referenced by lookup(), it is tested to see if it is a valid
725     keep locator to a manifest, and if so, loads the manifest contents as a
726     subdirectory of this directory with the locator as the directory name.
727     Since querying a list of all extant keep locators is impractical, only
728     collections that have already been accessed are visible to readdir().
729
730     """
731
732     README_TEXT = """
733 This directory provides access to Arvados collections as subdirectories listed
734 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
735 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
736 (in the form 'zzzzz-j7d0g-1234567890abcde').
737
738 Note that this directory will appear empty until you attempt to access a
739 specific collection or project subdirectory (such as trying to 'cd' into it),
740 at which point the collection or project will actually be looked up on the server
741 and the directory will appear if it exists.
742
743 """.lstrip()
744
745     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, pdh_only=False, storage_classes=None):
746         super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
747         self.api = api
748         self.num_retries = num_retries
749         self.pdh_only = pdh_only
750         self.storage_classes = storage_classes
751
752     def __setattr__(self, name, value):
753         super(MagicDirectory, self).__setattr__(name, value)
754         # When we're assigned an inode, add a README.
755         if ((name == 'inode') and (self.inode is not None) and
756               (not self._entries)):
757             self._entries['README'] = self.inodes.add_entry(
758                 StringFile(self.inode, self.README_TEXT, time.time()))
759             # If we're the root directory, add an identical by_id subdirectory.
760             if self.inode == llfuse.ROOT_INODE:
761                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
762                     self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
763                     self.pdh_only))
764
765     def __contains__(self, k):
766         if k in self._entries:
767             return True
768
769         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
770             return False
771
772         try:
773             e = None
774
775             if group_uuid_pattern.match(k):
776                 project = self.api.groups().list(
777                     filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
778                 if project[u'items_available'] == 0:
779                     return False
780                 e = self.inodes.add_entry(ProjectDirectory(
781                     self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
782                     project[u'items'][0], storage_classes=self.storage_classes))
783             else:
784                 e = self.inodes.add_entry(CollectionDirectory(
785                         self.inode, self.inodes, self.api, self.num_retries, self._enable_write, k))
786
787             if e.update():
788                 if k not in self._entries:
789                     self._entries[k] = e
790                 else:
791                     self.inodes.del_entry(e)
792                 return True
793             else:
794                 self.inodes.invalidate_entry(self, k)
795                 self.inodes.del_entry(e)
796                 return False
797         except Exception as ex:
798             _logger.exception("arv-mount lookup '%s':", k)
799             if e is not None:
800                 self.inodes.del_entry(e)
801             return False
802
803     def __getitem__(self, item):
804         if item in self:
805             return self._entries[item]
806         else:
807             raise KeyError("No collection with id " + item)
808
809     def clear(self):
810         pass
811
812     def want_event_subscribe(self):
813         return not self.pdh_only
814
815
816 class TagsDirectory(Directory):
817     """A special directory that contains as subdirectories all tags visible to the user."""
818
819     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, poll_time=60):
820         super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
821         self.api = api
822         self.num_retries = num_retries
823         self._poll = True
824         self._poll_time = poll_time
825         self._extra = set()
826
827     def want_event_subscribe(self):
828         return True
829
830     @use_counter
831     def update(self):
832         with llfuse.lock_released:
833             tags = self.api.links().list(
834                 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
835                 select=['name'], distinct=True, limit=1000
836                 ).execute(num_retries=self.num_retries)
837         if "items" in tags:
838             self.merge(tags['items']+[{"name": n} for n in self._extra],
839                        lambda i: i['name'],
840                        lambda a, i: a.tag == i['name'],
841                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
842                                               i['name'], poll=self._poll, poll_time=self._poll_time))
843
844     @use_counter
845     @check_update
846     def __getitem__(self, item):
847         if super(TagsDirectory, self).__contains__(item):
848             return super(TagsDirectory, self).__getitem__(item)
849         with llfuse.lock_released:
850             tags = self.api.links().list(
851                 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
852             ).execute(num_retries=self.num_retries)
853         if tags["items"]:
854             self._extra.add(item)
855             self.update()
856         return super(TagsDirectory, self).__getitem__(item)
857
858     @use_counter
859     @check_update
860     def __contains__(self, k):
861         if super(TagsDirectory, self).__contains__(k):
862             return True
863         try:
864             self[k]
865             return True
866         except KeyError:
867             pass
868         return False
869
870
871 class TagDirectory(Directory):
872     """A special directory that contains as subdirectories all collections visible
873     to the user that are tagged with a particular tag.
874     """
875
876     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, tag,
877                  poll=False, poll_time=60):
878         super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
879         self.api = api
880         self.num_retries = num_retries
881         self.tag = tag
882         self._poll = poll
883         self._poll_time = poll_time
884
885     def want_event_subscribe(self):
886         return True
887
888     @use_counter
889     def update(self):
890         with llfuse.lock_released:
891             taggedcollections = self.api.links().list(
892                 filters=[['link_class', '=', 'tag'],
893                          ['name', '=', self.tag],
894                          ['head_uuid', 'is_a', 'arvados#collection']],
895                 select=['head_uuid']
896                 ).execute(num_retries=self.num_retries)
897         self.merge(taggedcollections['items'],
898                    lambda i: i['head_uuid'],
899                    lambda a, i: a.collection_locator == i['head_uuid'],
900                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid']))
901
902
903 class ProjectDirectory(Directory):
904     """A special directory that contains the contents of a project."""
905
906     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, project_object,
907                  poll=True, poll_time=3, storage_classes=None):
908         super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
909         self.api = api
910         self.num_retries = num_retries
911         self.project_object = project_object
912         self.project_object_file = None
913         self.project_uuid = project_object['uuid']
914         self._poll = poll
915         self._poll_time = poll_time
916         self._updating_lock = threading.Lock()
917         self._current_user = None
918         self._full_listing = False
919         self.storage_classes = storage_classes
920
921     def want_event_subscribe(self):
922         return True
923
924     def createDirectory(self, i):
925         if collection_uuid_pattern.match(i['uuid']):
926             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i)
927         elif group_uuid_pattern.match(i['uuid']):
928             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
929                                     i, self._poll, self._poll_time, self.storage_classes)
930         elif link_uuid_pattern.match(i['uuid']):
931             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
932                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid'])
933             else:
934                 return None
935         elif uuid_pattern.match(i['uuid']):
936             return ObjectFile(self.parent_inode, i)
937         else:
938             return None
939
940     def uuid(self):
941         return self.project_uuid
942
943     def items(self):
944         self._full_listing = True
945         return super(ProjectDirectory, self).items()
946
947     def namefn(self, i):
948         if 'name' in i:
949             if i['name'] is None or len(i['name']) == 0:
950                 return None
951             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
952                 # collection or subproject
953                 return i['name']
954             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
955                 # name link
956                 return i['name']
957             elif 'kind' in i and i['kind'].startswith('arvados#'):
958                 # something else
959                 return "{}.{}".format(i['name'], i['kind'][8:])
960         else:
961             return None
962
963
964     @use_counter
965     def update(self):
966         if self.project_object_file == None:
967             self.project_object_file = ObjectFile(self.inode, self.project_object)
968             self.inodes.add_entry(self.project_object_file)
969
970         if not self._full_listing:
971             return True
972
973         def samefn(a, i):
974             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
975                 return a.uuid() == i['uuid']
976             elif isinstance(a, ObjectFile):
977                 return a.uuid() == i['uuid'] and not a.stale()
978             return False
979
980         try:
981             with llfuse.lock_released:
982                 self._updating_lock.acquire()
983                 if not self.stale():
984                     return
985
986                 if group_uuid_pattern.match(self.project_uuid):
987                     self.project_object = self.api.groups().get(
988                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
989                 elif user_uuid_pattern.match(self.project_uuid):
990                     self.project_object = self.api.users().get(
991                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
992                 # do this in 2 steps until #17424 is fixed
993                 contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
994                                                         order_key="uuid",
995                                                         num_retries=self.num_retries,
996                                                         uuid=self.project_uuid,
997                                                         filters=[["uuid", "is_a", "arvados#group"],
998                                                                  ["groups.group_class", "in", ["project","filter"]]]))
999                 contents.extend(filter(lambda i: i["current_version_uuid"] == i["uuid"],
1000                                        arvados.util.keyset_list_all(self.api.groups().contents,
1001                                                              order_key="uuid",
1002                                                              num_retries=self.num_retries,
1003                                                              uuid=self.project_uuid,
1004                                                              filters=[["uuid", "is_a", "arvados#collection"]])))
1005
1006
1007             # end with llfuse.lock_released, re-acquire lock
1008
1009             self.merge(contents,
1010                        self.namefn,
1011                        samefn,
1012                        self.createDirectory)
1013             return True
1014         finally:
1015             self._updating_lock.release()
1016
1017     def _add_entry(self, i, name):
1018         ent = self.createDirectory(i)
1019         self._entries[name] = self.inodes.add_entry(ent)
1020         return self._entries[name]
1021
1022     @use_counter
1023     @check_update
1024     def __getitem__(self, k):
1025         if k == '.arvados#project':
1026             return self.project_object_file
1027         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1028             return super(ProjectDirectory, self).__getitem__(k)
1029         with llfuse.lock_released:
1030             k2 = self.unsanitize_filename(k)
1031             if k2 == k:
1032                 namefilter = ["name", "=", k]
1033             else:
1034                 namefilter = ["name", "in", [k, k2]]
1035             contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
1036                                                        ["group_class", "in", ["project","filter"]],
1037                                                        namefilter],
1038                                               limit=2).execute(num_retries=self.num_retries)["items"]
1039             if not contents:
1040                 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
1041                                                                 namefilter],
1042                                                        limit=2).execute(num_retries=self.num_retries)["items"]
1043         if contents:
1044             if len(contents) > 1 and contents[1]['name'] == k:
1045                 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1046                 # "foo[SUBST]bar".
1047                 contents = [contents[1]]
1048             name = self.sanitize_filename(self.namefn(contents[0]))
1049             if name != k:
1050                 raise KeyError(k)
1051             return self._add_entry(contents[0], name)
1052
1053         # Didn't find item
1054         raise KeyError(k)
1055
1056     def __contains__(self, k):
1057         if k == '.arvados#project':
1058             return True
1059         try:
1060             self[k]
1061             return True
1062         except KeyError:
1063             pass
1064         return False
1065
1066     @use_counter
1067     @check_update
1068     def writable(self):
1069         if not self._enable_write:
1070             return False
1071         with llfuse.lock_released:
1072             if not self._current_user:
1073                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1074             return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1075
1076     def persisted(self):
1077         return True
1078
1079     @use_counter
1080     @check_update
1081     def mkdir(self, name):
1082         if not self.writable():
1083             raise llfuse.FUSEError(errno.EROFS)
1084
1085         try:
1086             with llfuse.lock_released:
1087                 c = {
1088                     "owner_uuid": self.project_uuid,
1089                     "name": name,
1090                     "manifest_text": "" }
1091                 if self.storage_classes is not None:
1092                     c["storage_classes_desired"] = self.storage_classes
1093                 try:
1094                     self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1095                 except Exception as e:
1096                     raise
1097             self.invalidate()
1098         except apiclient_errors.Error as error:
1099             _logger.error(error)
1100             raise llfuse.FUSEError(errno.EEXIST)
1101
1102     @use_counter
1103     @check_update
1104     def rmdir(self, name):
1105         if not self.writable():
1106             raise llfuse.FUSEError(errno.EROFS)
1107
1108         if name not in self:
1109             raise llfuse.FUSEError(errno.ENOENT)
1110         if not isinstance(self[name], CollectionDirectory):
1111             raise llfuse.FUSEError(errno.EPERM)
1112         if len(self[name]) > 0:
1113             raise llfuse.FUSEError(errno.ENOTEMPTY)
1114         with llfuse.lock_released:
1115             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1116         self.invalidate()
1117
1118     @use_counter
1119     @check_update
1120     def rename(self, name_old, name_new, src):
1121         if not self.writable():
1122             raise llfuse.FUSEError(errno.EROFS)
1123
1124         if not isinstance(src, ProjectDirectory):
1125             raise llfuse.FUSEError(errno.EPERM)
1126
1127         ent = src[name_old]
1128
1129         if not isinstance(ent, CollectionDirectory):
1130             raise llfuse.FUSEError(errno.EPERM)
1131
1132         if name_new in self:
1133             # POSIX semantics for replacing one directory with another is
1134             # tricky (the target directory must be empty, the operation must be
1135             # atomic which isn't possible with the Arvados API as of this
1136             # writing) so don't support that.
1137             raise llfuse.FUSEError(errno.EPERM)
1138
1139         self.api.collections().update(uuid=ent.uuid(),
1140                                       body={"owner_uuid": self.uuid(),
1141                                             "name": name_new}).execute(num_retries=self.num_retries)
1142
1143         # Acually move the entry from source directory to this directory.
1144         del src._entries[name_old]
1145         self._entries[name_new] = ent
1146         self.inodes.invalidate_entry(src, name_old)
1147
1148     @use_counter
1149     def child_event(self, ev):
1150         properties = ev.get("properties") or {}
1151         old_attrs = properties.get("old_attributes") or {}
1152         new_attrs = properties.get("new_attributes") or {}
1153         old_attrs["uuid"] = ev["object_uuid"]
1154         new_attrs["uuid"] = ev["object_uuid"]
1155         old_name = self.sanitize_filename(self.namefn(old_attrs))
1156         new_name = self.sanitize_filename(self.namefn(new_attrs))
1157
1158         # create events will have a new name, but not an old name
1159         # delete events will have an old name, but not a new name
1160         # update events will have an old and new name, and they may be same or different
1161         # if they are the same, an unrelated field changed and there is nothing to do.
1162
1163         if old_attrs.get("owner_uuid") != self.project_uuid:
1164             # Was moved from somewhere else, so don't try to remove entry.
1165             old_name = None
1166         if ev.get("object_owner_uuid") != self.project_uuid:
1167             # Was moved to somewhere else, so don't try to add entry
1168             new_name = None
1169
1170         if old_attrs.get("is_trashed"):
1171             # Was previously deleted
1172             old_name = None
1173         if new_attrs.get("is_trashed"):
1174             # Has been deleted
1175             new_name = None
1176
1177         if new_name != old_name:
1178             ent = None
1179             if old_name in self._entries:
1180                 ent = self._entries[old_name]
1181                 del self._entries[old_name]
1182                 self.inodes.invalidate_entry(self, old_name)
1183
1184             if new_name:
1185                 if ent is not None:
1186                     self._entries[new_name] = ent
1187                 else:
1188                     self._add_entry(new_attrs, new_name)
1189             elif ent is not None:
1190                 self.inodes.del_entry(ent)
1191
1192
1193 class SharedDirectory(Directory):
1194     """A special directory that represents users or groups who have shared projects with me."""
1195
1196     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, exclude,
1197                  poll=False, poll_time=60, storage_classes=None):
1198         super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
1199         self.api = api
1200         self.num_retries = num_retries
1201         self.current_user = api.users().current().execute(num_retries=num_retries)
1202         self._poll = True
1203         self._poll_time = poll_time
1204         self._updating_lock = threading.Lock()
1205         self.storage_classes = storage_classes
1206
1207     @use_counter
1208     def update(self):
1209         try:
1210             with llfuse.lock_released:
1211                 self._updating_lock.acquire()
1212                 if not self.stale():
1213                     return
1214
1215                 contents = {}
1216                 roots = []
1217                 root_owners = set()
1218                 objects = {}
1219
1220                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1221                 if 'httpMethod' in methods.get('shared', {}):
1222                     page = []
1223                     while True:
1224                         resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
1225                                                         order="uuid",
1226                                                         limit=10000,
1227                                                         count="none",
1228                                                         include="owner_uuid").execute()
1229                         if not resp["items"]:
1230                             break
1231                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1232                         for r in resp["items"]:
1233                             objects[r["uuid"]] = r
1234                             roots.append(r["uuid"])
1235                         for r in resp["included"]:
1236                             objects[r["uuid"]] = r
1237                             root_owners.add(r["uuid"])
1238                 else:
1239                     all_projects = list(arvados.util.keyset_list_all(
1240                         self.api.groups().list,
1241                         order_key="uuid",
1242                         num_retries=self.num_retries,
1243                         filters=[['group_class','in',['project','filter']]],
1244                         select=["uuid", "owner_uuid"]))
1245                     for ob in all_projects:
1246                         objects[ob['uuid']] = ob
1247
1248                     current_uuid = self.current_user['uuid']
1249                     for ob in all_projects:
1250                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1251                             roots.append(ob['uuid'])
1252                             root_owners.add(ob['owner_uuid'])
1253
1254                     lusers = arvados.util.keyset_list_all(
1255                         self.api.users().list,
1256                         order_key="uuid",
1257                         num_retries=self.num_retries,
1258                         filters=[['uuid','in', list(root_owners)]])
1259                     lgroups = arvados.util.keyset_list_all(
1260                         self.api.groups().list,
1261                         order_key="uuid",
1262                         num_retries=self.num_retries,
1263                         filters=[['uuid','in', list(root_owners)+roots]])
1264
1265                     for l in lusers:
1266                         objects[l["uuid"]] = l
1267                     for l in lgroups:
1268                         objects[l["uuid"]] = l
1269
1270                 for r in root_owners:
1271                     if r in objects:
1272                         obr = objects[r]
1273                         if obr.get("name"):
1274                             contents[obr["name"]] = obr
1275                         elif "first_name" in obr:
1276                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1277
1278                 for r in roots:
1279                     if r in objects:
1280                         obr = objects[r]
1281                         if obr['owner_uuid'] not in objects:
1282                             contents[obr["name"]] = obr
1283
1284             # end with llfuse.lock_released, re-acquire lock
1285
1286             self.merge(contents.items(),
1287                        lambda i: i[0],
1288                        lambda a, i: a.uuid() == i[1]['uuid'],
1289                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
1290                                                   i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
1291         except Exception:
1292             _logger.exception("arv-mount shared dir error")
1293         finally:
1294             self._updating_lock.release()
1295
1296     def want_event_subscribe(self):
1297         return True