bea7ed18c4fc4642e7c329f3245d093c688ff415
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import apiclient
6 import arvados
7 import errno
8 import functools
9 import llfuse
10 import logging
11 import re
12 import sys
13 import threading
14 import time
15 from apiclient import errors as apiclient_errors
16
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
19
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
22
23 _logger = logging.getLogger('arvados.arvados_fuse')
24
25
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile('[\x00/]')
30
31
32 class Directory(FreshBase):
33     """Generic directory object, backed by a dict.
34
35     Consists of a set of entries with the key representing the filename
36     and the value referencing a File or Directory object.
37     """
38
39     def __init__(self, parent_inode, inodes, apiconfig, enable_write):
40         """parent_inode is the integer inode number"""
41
42         super(Directory, self).__init__()
43
44         self.inode = None
45         if not isinstance(parent_inode, int):
46             raise Exception("parent_inode should be an int")
47         self.parent_inode = parent_inode
48         self.inodes = inodes
49         self.apiconfig = apiconfig
50         self._entries = {}
51         self._mtime = time.time()
52         self._enable_write = enable_write
53
54     def forward_slash_subst(self):
55         if not hasattr(self, '_fsns'):
56             self._fsns = None
57             config = self.apiconfig()
58             try:
59                 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
60             except KeyError:
61                 # old API server with no FSNS config
62                 self._fsns = '_'
63             else:
64                 if self._fsns == '' or self._fsns == '/':
65                     self._fsns = None
66         return self._fsns
67
68     def unsanitize_filename(self, incoming):
69         """Replace ForwardSlashNameSubstitution value with /"""
70         fsns = self.forward_slash_subst()
71         if isinstance(fsns, str):
72             return incoming.replace(fsns, '/')
73         else:
74             return incoming
75
76     def sanitize_filename(self, dirty):
77         """Replace disallowed filename characters according to
78         ForwardSlashNameSubstitution in self.api_config."""
79         # '.' and '..' are not reachable if API server is newer than #6277
80         if dirty is None:
81             return None
82         elif dirty == '':
83             return '_'
84         elif dirty == '.':
85             return '_'
86         elif dirty == '..':
87             return '__'
88         else:
89             fsns = self.forward_slash_subst()
90             if isinstance(fsns, str):
91                 dirty = dirty.replace('/', fsns)
92             return _disallowed_filename_characters.sub('_', dirty)
93
94
95     #  Overridden by subclasses to implement logic to update the
96     #  entries dict when the directory is stale
97     @use_counter
98     def update(self):
99         pass
100
101     # Only used when computing the size of the disk footprint of the directory
102     # (stub)
103     def size(self):
104         return 0
105
106     def persisted(self):
107         return False
108
109     def checkupdate(self):
110         if self.stale():
111             try:
112                 self.update()
113             except apiclient.errors.HttpError as e:
114                 _logger.warn(e)
115
116     @use_counter
117     @check_update
118     def __getitem__(self, item):
119         return self._entries[item]
120
121     @use_counter
122     @check_update
123     def items(self):
124         return list(self._entries.items())
125
126     @use_counter
127     @check_update
128     def __contains__(self, k):
129         return k in self._entries
130
131     @use_counter
132     @check_update
133     def __len__(self):
134         return len(self._entries)
135
136     def fresh(self):
137         self.inodes.touch(self)
138         super(Directory, self).fresh()
139
140     def merge(self, items, fn, same, new_entry):
141         """Helper method for updating the contents of the directory.
142
143         Takes a list describing the new contents of the directory, reuse
144         entries that are the same in both the old and new lists, create new
145         entries, and delete old entries missing from the new list.
146
147         :items: iterable with new directory contents
148
149         :fn: function to take an entry in 'items' and return the desired file or
150         directory name, or None if this entry should be skipped
151
152         :same: function to compare an existing entry (a File or Directory
153         object) with an entry in the items list to determine whether to keep
154         the existing entry.
155
156         :new_entry: function to create a new directory entry (File or Directory
157         object) from an entry in the items list.
158
159         """
160
161         oldentries = self._entries
162         self._entries = {}
163         changed = False
164         for i in items:
165             name = self.sanitize_filename(fn(i))
166             if name:
167                 if name in oldentries and same(oldentries[name], i):
168                     # move existing directory entry over
169                     self._entries[name] = oldentries[name]
170                     del oldentries[name]
171                 else:
172                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
173                     # create new directory entry
174                     ent = new_entry(i)
175                     if ent is not None:
176                         self._entries[name] = self.inodes.add_entry(ent)
177                         changed = True
178
179         # delete any other directory entries that were not in found in 'items'
180         for i in oldentries:
181             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
182             self.inodes.invalidate_entry(self, i)
183             self.inodes.del_entry(oldentries[i])
184             changed = True
185
186         if changed:
187             self.inodes.invalidate_inode(self)
188             self._mtime = time.time()
189
190         self.fresh()
191
192     def in_use(self):
193         if super(Directory, self).in_use():
194             return True
195         for v in self._entries.values():
196             if v.in_use():
197                 return True
198         return False
199
200     def has_ref(self, only_children):
201         if super(Directory, self).has_ref(only_children):
202             return True
203         for v in self._entries.values():
204             if v.has_ref(False):
205                 return True
206         return False
207
208     def clear(self):
209         """Delete all entries"""
210         oldentries = self._entries
211         self._entries = {}
212         for n in oldentries:
213             oldentries[n].clear()
214             self.inodes.del_entry(oldentries[n])
215         self.invalidate()
216
217     def kernel_invalidate(self):
218         # Invalidating the dentry on the parent implies invalidating all paths
219         # below it as well.
220         parent = self.inodes[self.parent_inode]
221
222         # Find self on the parent in order to invalidate this path.
223         # Calling the public items() method might trigger a refresh,
224         # which we definitely don't want, so read the internal dict directly.
225         for k,v in parent._entries.items():
226             if v is self:
227                 self.inodes.invalidate_entry(parent, k)
228                 break
229
230     def mtime(self):
231         return self._mtime
232
233     def writable(self):
234         return False
235
236     def flush(self):
237         pass
238
239     def want_event_subscribe(self):
240         raise NotImplementedError()
241
242     def create(self, name):
243         raise NotImplementedError()
244
245     def mkdir(self, name):
246         raise NotImplementedError()
247
248     def unlink(self, name):
249         raise NotImplementedError()
250
251     def rmdir(self, name):
252         raise NotImplementedError()
253
254     def rename(self, name_old, name_new, src):
255         raise NotImplementedError()
256
257
258 class CollectionDirectoryBase(Directory):
259     """Represent an Arvados Collection as a directory.
260
261     This class is used for Subcollections, and is also the base class for
262     CollectionDirectory, which implements collection loading/saving on
263     Collection records.
264
265     Most operations act only the underlying Arvados `Collection` object.  The
266     `Collection` object signals via a notify callback to
267     `CollectionDirectoryBase.on_event` that an item was added, removed or
268     modified.  FUSE inodes and directory entries are created, deleted or
269     invalidated in response to these events.
270
271     """
272
273     def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection, collection_root):
274         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write)
275         self.apiconfig = apiconfig
276         self.collection = collection
277         self.collection_root = collection_root
278         self.collection_record_file = None
279
280     def new_entry(self, name, item, mtime):
281         name = self.sanitize_filename(name)
282         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
283             if item.fuse_entry.dead is not True:
284                 raise Exception("Can only reparent dead inode entry")
285             if item.fuse_entry.inode is None:
286                 raise Exception("Reparented entry must still have valid inode")
287             item.fuse_entry.dead = False
288             self._entries[name] = item.fuse_entry
289         elif isinstance(item, arvados.collection.RichCollectionBase):
290             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item, self.collection_root))
291             self._entries[name].populate(mtime)
292         else:
293             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
294         item.fuse_entry = self._entries[name]
295
296     def on_event(self, event, collection, name, item):
297         # These are events from the Collection object (ADD/DEL/MOD)
298         # emitted by operations on the Collection object (like
299         # "mkdirs" or "remove"), and by "update", which we need to
300         # synchronize with our FUSE objects that are assigned inodes.
301         if collection == self.collection:
302             name = self.sanitize_filename(name)
303
304             #
305             # It's possible for another thread to have llfuse.lock and
306             # be waiting on collection.lock.  Meanwhile, we released
307             # llfuse.lock earlier in the stack, but are still holding
308             # on to the collection lock, and now we need to re-acquire
309             # llfuse.lock.  If we don't release the collection lock,
310             # we'll deadlock where we're holding the collection lock
311             # waiting for llfuse.lock and the other thread is holding
312             # llfuse.lock and waiting for the collection lock.
313             #
314             # The correct locking order here is to take llfuse.lock
315             # first, then the collection lock.
316             #
317             # Since collection.lock is an RLock, it might be locked
318             # multiple times, so we need to release it multiple times,
319             # keep a count, then re-lock it the correct number of
320             # times.
321             #
322             lockcount = 0
323             try:
324                 while True:
325                     self.collection.lock.release()
326                     lockcount += 1
327             except RuntimeError:
328                 pass
329
330             try:
331                 with llfuse.lock:
332                     with self.collection.lock:
333                         if event == arvados.collection.ADD:
334                             self.new_entry(name, item, self.mtime())
335                         elif event == arvados.collection.DEL:
336                             ent = self._entries[name]
337                             del self._entries[name]
338                             self.inodes.invalidate_entry(self, name)
339                             self.inodes.del_entry(ent)
340                         elif event == arvados.collection.MOD:
341                             if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
342                                 self.inodes.invalidate_inode(item.fuse_entry)
343                             elif name in self._entries:
344                                 self.inodes.invalidate_inode(self._entries[name])
345
346                         if self.collection_record_file is not None:
347                             self.collection_record_file.invalidate()
348                             self.inodes.invalidate_inode(self.collection_record_file)
349             finally:
350                 while lockcount > 0:
351                     self.collection.lock.acquire()
352                     lockcount -= 1
353
354     def populate(self, mtime):
355         self._mtime = mtime
356         with self.collection.lock:
357             self.collection.subscribe(self.on_event)
358             for entry, item in self.collection.items():
359                 self.new_entry(entry, item, self.mtime())
360
361     def writable(self):
362         return self._enable_write and self.collection.writable()
363
364     @use_counter
365     def flush(self):
366         self.collection_root.flush()
367
368     @use_counter
369     @check_update
370     def create(self, name):
371         if not self.writable():
372             raise llfuse.FUSEError(errno.EROFS)
373         with llfuse.lock_released:
374             self.collection.open(name, "w").close()
375
376     @use_counter
377     @check_update
378     def mkdir(self, name):
379         if not self.writable():
380             raise llfuse.FUSEError(errno.EROFS)
381         with llfuse.lock_released:
382             self.collection.mkdirs(name)
383
384     @use_counter
385     @check_update
386     def unlink(self, name):
387         if not self.writable():
388             raise llfuse.FUSEError(errno.EROFS)
389         with llfuse.lock_released:
390             self.collection.remove(name)
391         self.flush()
392
393     @use_counter
394     @check_update
395     def rmdir(self, name):
396         if not self.writable():
397             raise llfuse.FUSEError(errno.EROFS)
398         with llfuse.lock_released:
399             self.collection.remove(name)
400         self.flush()
401
402     @use_counter
403     @check_update
404     def rename(self, name_old, name_new, src):
405         if not self.writable():
406             raise llfuse.FUSEError(errno.EROFS)
407
408         if not isinstance(src, CollectionDirectoryBase):
409             raise llfuse.FUSEError(errno.EPERM)
410
411         if name_new in self:
412             ent = src[name_old]
413             tgt = self[name_new]
414             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
415                 pass
416             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
417                 if len(tgt) > 0:
418                     raise llfuse.FUSEError(errno.ENOTEMPTY)
419             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
420                 raise llfuse.FUSEError(errno.ENOTDIR)
421             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
422                 raise llfuse.FUSEError(errno.EISDIR)
423
424         with llfuse.lock_released:
425             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
426         self.flush()
427         src.flush()
428
429     def clear(self):
430         super(CollectionDirectoryBase, self).clear()
431         self.collection = None
432
433
434 class CollectionDirectory(CollectionDirectoryBase):
435     """Represents the root of a directory tree representing a collection."""
436
437     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, collection_record=None, explicit_collection=None):
438         super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None, self)
439         self.api = api
440         self.num_retries = num_retries
441         self._poll = True
442         try:
443             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
444         except:
445             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
446             self._poll_time = 60*60
447
448         if isinstance(collection_record, dict):
449             self.collection_locator = collection_record['uuid']
450             self._mtime = convertTime(collection_record.get('modified_at'))
451         else:
452             self.collection_locator = collection_record
453             self._mtime = 0
454         self._manifest_size = 0
455         if self.collection_locator:
456             self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
457         self._updating_lock = threading.Lock()
458
459     def same(self, i):
460         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
461
462     def writable(self):
463         return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
464
465     @use_counter
466     def flush(self):
467         if not self.writable():
468             return
469         with llfuse.lock_released:
470             with self._updating_lock:
471                 if self.collection.committed():
472                     self.collection.update()
473                 else:
474                     self.collection.save()
475                 self.new_collection_record(self.collection.api_response())
476
477     def want_event_subscribe(self):
478         return (uuid_pattern.match(self.collection_locator) is not None)
479
480     def new_collection(self, new_collection_record, coll_reader):
481         if self.inode:
482             self.clear()
483         self.collection = coll_reader
484         self.new_collection_record(new_collection_record)
485         self.populate(self.mtime())
486
487     def new_collection_record(self, new_collection_record):
488         if not new_collection_record:
489             raise Exception("invalid new_collection_record")
490         self._mtime = convertTime(new_collection_record.get('modified_at'))
491         self._manifest_size = len(new_collection_record["manifest_text"])
492         self.collection_locator = new_collection_record["uuid"]
493         if self.collection_record_file is not None:
494             self.collection_record_file.invalidate()
495             self.inodes.invalidate_inode(self.collection_record_file)
496             _logger.debug("%s invalidated collection record file", self)
497         self.fresh()
498
499     def uuid(self):
500         return self.collection_locator
501
502     @use_counter
503     def update(self):
504         try:
505             if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator):
506                 # It's immutable, nothing to update
507                 return True
508
509             if self.collection_locator is None:
510                 # No collection locator to retrieve from
511                 self.fresh()
512                 return True
513
514             new_collection_record = None
515             try:
516                 with llfuse.lock_released:
517                     self._updating_lock.acquire()
518                     if not self.stale():
519                         return True
520
521                     _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
522                     coll_reader = None
523                     if self.collection is not None:
524                         # Already have a collection object
525                         self.collection.update()
526                         new_collection_record = self.collection.api_response()
527                     else:
528                         # Create a new collection object
529                         if uuid_pattern.match(self.collection_locator):
530                             coll_reader = arvados.collection.Collection(
531                                 self.collection_locator, self.api, self.api.keep,
532                                 num_retries=self.num_retries,
533                                 get_threads=(self.api.keep.block_cache.cache_max // 64 * 1024 * 1024)                            )
534                         else:
535                             coll_reader = arvados.collection.CollectionReader(
536                                 self.collection_locator, self.api, self.api.keep,
537                                 num_retries=self.num_retries,
538                                 get_threads=(self.api.keep.block_cache.cache_max // 64 * 1024 * 1024)
539                             )
540                         new_collection_record = coll_reader.api_response() or {}
541                         # If the Collection only exists in Keep, there will be no API
542                         # response.  Fill in the fields we need.
543                         if 'uuid' not in new_collection_record:
544                             new_collection_record['uuid'] = self.collection_locator
545                         if "portable_data_hash" not in new_collection_record:
546                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
547                         if 'manifest_text' not in new_collection_record:
548                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
549                         if 'storage_classes_desired' not in new_collection_record:
550                             new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
551
552                 # end with llfuse.lock_released, re-acquire lock
553
554                 if new_collection_record is not None:
555                     if coll_reader is not None:
556                         self.new_collection(new_collection_record, coll_reader)
557                     else:
558                         self.new_collection_record(new_collection_record)
559
560                 return True
561             finally:
562                 self._updating_lock.release()
563         except arvados.errors.NotFoundError as e:
564             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
565         except arvados.errors.ArgumentError as detail:
566             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
567             if new_collection_record is not None and "manifest_text" in new_collection_record:
568                 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
569         except Exception:
570             _logger.exception("arv-mount %s: error", self.collection_locator)
571             if new_collection_record is not None and "manifest_text" in new_collection_record:
572                 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
573         self.invalidate()
574         return False
575
576     @use_counter
577     def collection_record(self):
578         self.flush()
579         return self.collection.api_response()
580
581     @use_counter
582     @check_update
583     def __getitem__(self, item):
584         if item == '.arvados#collection':
585             if self.collection_record_file is None:
586                 self.collection_record_file = FuncToJSONFile(
587                     self.inode, self.collection_record)
588                 self.inodes.add_entry(self.collection_record_file)
589             self.invalidate()  # use lookup as a signal to force update
590             return self.collection_record_file
591         else:
592             return super(CollectionDirectory, self).__getitem__(item)
593
594     def __contains__(self, k):
595         if k == '.arvados#collection':
596             return True
597         else:
598             return super(CollectionDirectory, self).__contains__(k)
599
600     def invalidate(self):
601         if self.collection_record_file is not None:
602             self.collection_record_file.invalidate()
603             self.inodes.invalidate_inode(self.collection_record_file)
604         super(CollectionDirectory, self).invalidate()
605
606     def persisted(self):
607         return (self.collection_locator is not None)
608
609     def objsize(self):
610         # This is an empirically-derived heuristic to estimate the memory used
611         # to store this collection's metadata.  Calculating the memory
612         # footprint directly would be more accurate, but also more complicated.
613         return self._manifest_size * 128
614
615     def finalize(self):
616         if self.collection is not None:
617             if self.writable():
618                 self.collection.save()
619             self.collection.stop_threads()
620
621     def clear(self):
622         if self.collection is not None:
623             self.collection.stop_threads()
624         super(CollectionDirectory, self).clear()
625         self._manifest_size = 0
626
627
628 class TmpCollectionDirectory(CollectionDirectoryBase):
629     """A directory backed by an Arvados collection that never gets saved.
630
631     This supports using Keep as scratch space. A userspace program can
632     read the .arvados#collection file to get a current manifest in
633     order to save a snapshot of the scratch data or use it as a crunch
634     job output.
635     """
636
637     class UnsaveableCollection(arvados.collection.Collection):
638         def save(self):
639             pass
640         def save_new(self):
641             pass
642
643     def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, storage_classes=None):
644         collection = self.UnsaveableCollection(
645             api_client=api_client,
646             keep_client=api_client.keep,
647             num_retries=num_retries,
648             storage_classes_desired=storage_classes)
649         # This is always enable_write=True because it never tries to
650         # save to the backend
651         super(TmpCollectionDirectory, self).__init__(
652             parent_inode, inodes, api_client.config, True, collection, self)
653         self.populate(self.mtime())
654
655     def on_event(self, *args, **kwargs):
656         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
657         if self.collection_record_file is None:
658             return
659
660         # See discussion in CollectionDirectoryBase.on_event
661         lockcount = 0
662         try:
663             while True:
664                 self.collection.lock.release()
665                 lockcount += 1
666         except RuntimeError:
667             pass
668
669         try:
670             with llfuse.lock:
671                 with self.collection.lock:
672                     self.collection_record_file.invalidate()
673                     self.inodes.invalidate_inode(self.collection_record_file)
674                     _logger.debug("%s invalidated collection record", self)
675         finally:
676             while lockcount > 0:
677                 self.collection.lock.acquire()
678                 lockcount -= 1
679
680     def collection_record(self):
681         with llfuse.lock_released:
682             return {
683                 "uuid": None,
684                 "manifest_text": self.collection.manifest_text(),
685                 "portable_data_hash": self.collection.portable_data_hash(),
686                 "storage_classes_desired": self.collection.storage_classes_desired(),
687             }
688
689     def __contains__(self, k):
690         return (k == '.arvados#collection' or
691                 super(TmpCollectionDirectory, self).__contains__(k))
692
693     @use_counter
694     def __getitem__(self, item):
695         if item == '.arvados#collection':
696             if self.collection_record_file is None:
697                 self.collection_record_file = FuncToJSONFile(
698                     self.inode, self.collection_record)
699                 self.inodes.add_entry(self.collection_record_file)
700             return self.collection_record_file
701         return super(TmpCollectionDirectory, self).__getitem__(item)
702
703     def persisted(self):
704         return False
705
706     def writable(self):
707         return True
708
709     def flush(self):
710         pass
711
712     def want_event_subscribe(self):
713         return False
714
715     def finalize(self):
716         self.collection.stop_threads()
717
718     def invalidate(self):
719         if self.collection_record_file:
720             self.collection_record_file.invalidate()
721         super(TmpCollectionDirectory, self).invalidate()
722
723
724 class MagicDirectory(Directory):
725     """A special directory that logically contains the set of all extant keep locators.
726
727     When a file is referenced by lookup(), it is tested to see if it is a valid
728     keep locator to a manifest, and if so, loads the manifest contents as a
729     subdirectory of this directory with the locator as the directory name.
730     Since querying a list of all extant keep locators is impractical, only
731     collections that have already been accessed are visible to readdir().
732
733     """
734
735     README_TEXT = """
736 This directory provides access to Arvados collections as subdirectories listed
737 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
738 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
739 (in the form 'zzzzz-j7d0g-1234567890abcde').
740
741 Note that this directory will appear empty until you attempt to access a
742 specific collection or project subdirectory (such as trying to 'cd' into it),
743 at which point the collection or project will actually be looked up on the server
744 and the directory will appear if it exists.
745
746 """.lstrip()
747
748     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, pdh_only=False, storage_classes=None):
749         super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
750         self.api = api
751         self.num_retries = num_retries
752         self.pdh_only = pdh_only
753         self.storage_classes = storage_classes
754
755     def __setattr__(self, name, value):
756         super(MagicDirectory, self).__setattr__(name, value)
757         # When we're assigned an inode, add a README.
758         if ((name == 'inode') and (self.inode is not None) and
759               (not self._entries)):
760             self._entries['README'] = self.inodes.add_entry(
761                 StringFile(self.inode, self.README_TEXT, time.time()))
762             # If we're the root directory, add an identical by_id subdirectory.
763             if self.inode == llfuse.ROOT_INODE:
764                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
765                     self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
766                     self.pdh_only))
767
768     def __contains__(self, k):
769         if k in self._entries:
770             return True
771
772         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
773             return False
774
775         try:
776             e = None
777
778             if group_uuid_pattern.match(k):
779                 project = self.api.groups().list(
780                     filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
781                 if project[u'items_available'] == 0:
782                     return False
783                 e = self.inodes.add_entry(ProjectDirectory(
784                     self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
785                     project[u'items'][0], storage_classes=self.storage_classes))
786             else:
787                 e = self.inodes.add_entry(CollectionDirectory(
788                         self.inode, self.inodes, self.api, self.num_retries, self._enable_write, k))
789
790             if e.update():
791                 if k not in self._entries:
792                     self._entries[k] = e
793                 else:
794                     self.inodes.del_entry(e)
795                 return True
796             else:
797                 self.inodes.invalidate_entry(self, k)
798                 self.inodes.del_entry(e)
799                 return False
800         except Exception as ex:
801             _logger.exception("arv-mount lookup '%s':", k)
802             if e is not None:
803                 self.inodes.del_entry(e)
804             return False
805
806     def __getitem__(self, item):
807         if item in self:
808             return self._entries[item]
809         else:
810             raise KeyError("No collection with id " + item)
811
812     def clear(self):
813         pass
814
815     def want_event_subscribe(self):
816         return not self.pdh_only
817
818
819 class TagsDirectory(Directory):
820     """A special directory that contains as subdirectories all tags visible to the user."""
821
822     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, poll_time=60):
823         super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
824         self.api = api
825         self.num_retries = num_retries
826         self._poll = True
827         self._poll_time = poll_time
828         self._extra = set()
829
830     def want_event_subscribe(self):
831         return True
832
833     @use_counter
834     def update(self):
835         with llfuse.lock_released:
836             tags = self.api.links().list(
837                 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
838                 select=['name'], distinct=True, limit=1000
839                 ).execute(num_retries=self.num_retries)
840         if "items" in tags:
841             self.merge(tags['items']+[{"name": n} for n in self._extra],
842                        lambda i: i['name'],
843                        lambda a, i: a.tag == i['name'],
844                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
845                                               i['name'], poll=self._poll, poll_time=self._poll_time))
846
847     @use_counter
848     @check_update
849     def __getitem__(self, item):
850         if super(TagsDirectory, self).__contains__(item):
851             return super(TagsDirectory, self).__getitem__(item)
852         with llfuse.lock_released:
853             tags = self.api.links().list(
854                 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
855             ).execute(num_retries=self.num_retries)
856         if tags["items"]:
857             self._extra.add(item)
858             self.update()
859         return super(TagsDirectory, self).__getitem__(item)
860
861     @use_counter
862     @check_update
863     def __contains__(self, k):
864         if super(TagsDirectory, self).__contains__(k):
865             return True
866         try:
867             self[k]
868             return True
869         except KeyError:
870             pass
871         return False
872
873
874 class TagDirectory(Directory):
875     """A special directory that contains as subdirectories all collections visible
876     to the user that are tagged with a particular tag.
877     """
878
879     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, tag,
880                  poll=False, poll_time=60):
881         super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
882         self.api = api
883         self.num_retries = num_retries
884         self.tag = tag
885         self._poll = poll
886         self._poll_time = poll_time
887
888     def want_event_subscribe(self):
889         return True
890
891     @use_counter
892     def update(self):
893         with llfuse.lock_released:
894             taggedcollections = self.api.links().list(
895                 filters=[['link_class', '=', 'tag'],
896                          ['name', '=', self.tag],
897                          ['head_uuid', 'is_a', 'arvados#collection']],
898                 select=['head_uuid']
899                 ).execute(num_retries=self.num_retries)
900         self.merge(taggedcollections['items'],
901                    lambda i: i['head_uuid'],
902                    lambda a, i: a.collection_locator == i['head_uuid'],
903                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid']))
904
905
906 class ProjectDirectory(Directory):
907     """A special directory that contains the contents of a project."""
908
909     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, project_object,
910                  poll=True, poll_time=3, storage_classes=None):
911         super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
912         self.api = api
913         self.num_retries = num_retries
914         self.project_object = project_object
915         self.project_object_file = None
916         self.project_uuid = project_object['uuid']
917         self._poll = poll
918         self._poll_time = poll_time
919         self._updating_lock = threading.Lock()
920         self._current_user = None
921         self._full_listing = False
922         self.storage_classes = storage_classes
923
924     def want_event_subscribe(self):
925         return True
926
927     def createDirectory(self, i):
928         if collection_uuid_pattern.match(i['uuid']):
929             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i)
930         elif group_uuid_pattern.match(i['uuid']):
931             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
932                                     i, self._poll, self._poll_time, self.storage_classes)
933         elif link_uuid_pattern.match(i['uuid']):
934             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
935                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid'])
936             else:
937                 return None
938         elif uuid_pattern.match(i['uuid']):
939             return ObjectFile(self.parent_inode, i)
940         else:
941             return None
942
943     def uuid(self):
944         return self.project_uuid
945
946     def items(self):
947         self._full_listing = True
948         return super(ProjectDirectory, self).items()
949
950     def namefn(self, i):
951         if 'name' in i:
952             if i['name'] is None or len(i['name']) == 0:
953                 return None
954             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
955                 # collection or subproject
956                 return i['name']
957             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
958                 # name link
959                 return i['name']
960             elif 'kind' in i and i['kind'].startswith('arvados#'):
961                 # something else
962                 return "{}.{}".format(i['name'], i['kind'][8:])
963         else:
964             return None
965
966
967     @use_counter
968     def update(self):
969         if self.project_object_file == None:
970             self.project_object_file = ObjectFile(self.inode, self.project_object)
971             self.inodes.add_entry(self.project_object_file)
972
973         if not self._full_listing:
974             return True
975
976         def samefn(a, i):
977             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
978                 return a.uuid() == i['uuid']
979             elif isinstance(a, ObjectFile):
980                 return a.uuid() == i['uuid'] and not a.stale()
981             return False
982
983         try:
984             with llfuse.lock_released:
985                 self._updating_lock.acquire()
986                 if not self.stale():
987                     return
988
989                 if group_uuid_pattern.match(self.project_uuid):
990                     self.project_object = self.api.groups().get(
991                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
992                 elif user_uuid_pattern.match(self.project_uuid):
993                     self.project_object = self.api.users().get(
994                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
995                 # do this in 2 steps until #17424 is fixed
996                 contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
997                                                         order_key="uuid",
998                                                         num_retries=self.num_retries,
999                                                         uuid=self.project_uuid,
1000                                                         filters=[["uuid", "is_a", "arvados#group"],
1001                                                                  ["groups.group_class", "in", ["project","filter"]]]))
1002                 contents.extend(filter(lambda i: i["current_version_uuid"] == i["uuid"],
1003                                        arvados.util.keyset_list_all(self.api.groups().contents,
1004                                                              order_key="uuid",
1005                                                              num_retries=self.num_retries,
1006                                                              uuid=self.project_uuid,
1007                                                              filters=[["uuid", "is_a", "arvados#collection"]])))
1008
1009
1010             # end with llfuse.lock_released, re-acquire lock
1011
1012             self.merge(contents,
1013                        self.namefn,
1014                        samefn,
1015                        self.createDirectory)
1016             return True
1017         finally:
1018             self._updating_lock.release()
1019
1020     def _add_entry(self, i, name):
1021         ent = self.createDirectory(i)
1022         self._entries[name] = self.inodes.add_entry(ent)
1023         return self._entries[name]
1024
1025     @use_counter
1026     @check_update
1027     def __getitem__(self, k):
1028         if k == '.arvados#project':
1029             return self.project_object_file
1030         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1031             return super(ProjectDirectory, self).__getitem__(k)
1032         with llfuse.lock_released:
1033             k2 = self.unsanitize_filename(k)
1034             if k2 == k:
1035                 namefilter = ["name", "=", k]
1036             else:
1037                 namefilter = ["name", "in", [k, k2]]
1038             contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
1039                                                        ["group_class", "in", ["project","filter"]],
1040                                                        namefilter],
1041                                               limit=2).execute(num_retries=self.num_retries)["items"]
1042             if not contents:
1043                 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
1044                                                                 namefilter],
1045                                                        limit=2).execute(num_retries=self.num_retries)["items"]
1046         if contents:
1047             if len(contents) > 1 and contents[1]['name'] == k:
1048                 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1049                 # "foo[SUBST]bar".
1050                 contents = [contents[1]]
1051             name = self.sanitize_filename(self.namefn(contents[0]))
1052             if name != k:
1053                 raise KeyError(k)
1054             return self._add_entry(contents[0], name)
1055
1056         # Didn't find item
1057         raise KeyError(k)
1058
1059     def __contains__(self, k):
1060         if k == '.arvados#project':
1061             return True
1062         try:
1063             self[k]
1064             return True
1065         except KeyError:
1066             pass
1067         return False
1068
1069     @use_counter
1070     @check_update
1071     def writable(self):
1072         if not self._enable_write:
1073             return False
1074         with llfuse.lock_released:
1075             if not self._current_user:
1076                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1077             return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1078
1079     def persisted(self):
1080         return True
1081
1082     @use_counter
1083     @check_update
1084     def mkdir(self, name):
1085         if not self.writable():
1086             raise llfuse.FUSEError(errno.EROFS)
1087
1088         try:
1089             with llfuse.lock_released:
1090                 c = {
1091                     "owner_uuid": self.project_uuid,
1092                     "name": name,
1093                     "manifest_text": "" }
1094                 if self.storage_classes is not None:
1095                     c["storage_classes_desired"] = self.storage_classes
1096                 try:
1097                     self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1098                 except Exception as e:
1099                     raise
1100             self.invalidate()
1101         except apiclient_errors.Error as error:
1102             _logger.error(error)
1103             raise llfuse.FUSEError(errno.EEXIST)
1104
1105     @use_counter
1106     @check_update
1107     def rmdir(self, name):
1108         if not self.writable():
1109             raise llfuse.FUSEError(errno.EROFS)
1110
1111         if name not in self:
1112             raise llfuse.FUSEError(errno.ENOENT)
1113         if not isinstance(self[name], CollectionDirectory):
1114             raise llfuse.FUSEError(errno.EPERM)
1115         if len(self[name]) > 0:
1116             raise llfuse.FUSEError(errno.ENOTEMPTY)
1117         with llfuse.lock_released:
1118             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1119         self.invalidate()
1120
1121     @use_counter
1122     @check_update
1123     def rename(self, name_old, name_new, src):
1124         if not self.writable():
1125             raise llfuse.FUSEError(errno.EROFS)
1126
1127         if not isinstance(src, ProjectDirectory):
1128             raise llfuse.FUSEError(errno.EPERM)
1129
1130         ent = src[name_old]
1131
1132         if not isinstance(ent, CollectionDirectory):
1133             raise llfuse.FUSEError(errno.EPERM)
1134
1135         if name_new in self:
1136             # POSIX semantics for replacing one directory with another is
1137             # tricky (the target directory must be empty, the operation must be
1138             # atomic which isn't possible with the Arvados API as of this
1139             # writing) so don't support that.
1140             raise llfuse.FUSEError(errno.EPERM)
1141
1142         self.api.collections().update(uuid=ent.uuid(),
1143                                       body={"owner_uuid": self.uuid(),
1144                                             "name": name_new}).execute(num_retries=self.num_retries)
1145
1146         # Acually move the entry from source directory to this directory.
1147         del src._entries[name_old]
1148         self._entries[name_new] = ent
1149         self.inodes.invalidate_entry(src, name_old)
1150
1151     @use_counter
1152     def child_event(self, ev):
1153         properties = ev.get("properties") or {}
1154         old_attrs = properties.get("old_attributes") or {}
1155         new_attrs = properties.get("new_attributes") or {}
1156         old_attrs["uuid"] = ev["object_uuid"]
1157         new_attrs["uuid"] = ev["object_uuid"]
1158         old_name = self.sanitize_filename(self.namefn(old_attrs))
1159         new_name = self.sanitize_filename(self.namefn(new_attrs))
1160
1161         # create events will have a new name, but not an old name
1162         # delete events will have an old name, but not a new name
1163         # update events will have an old and new name, and they may be same or different
1164         # if they are the same, an unrelated field changed and there is nothing to do.
1165
1166         if old_attrs.get("owner_uuid") != self.project_uuid:
1167             # Was moved from somewhere else, so don't try to remove entry.
1168             old_name = None
1169         if ev.get("object_owner_uuid") != self.project_uuid:
1170             # Was moved to somewhere else, so don't try to add entry
1171             new_name = None
1172
1173         if old_attrs.get("is_trashed"):
1174             # Was previously deleted
1175             old_name = None
1176         if new_attrs.get("is_trashed"):
1177             # Has been deleted
1178             new_name = None
1179
1180         if new_name != old_name:
1181             ent = None
1182             if old_name in self._entries:
1183                 ent = self._entries[old_name]
1184                 del self._entries[old_name]
1185                 self.inodes.invalidate_entry(self, old_name)
1186
1187             if new_name:
1188                 if ent is not None:
1189                     self._entries[new_name] = ent
1190                 else:
1191                     self._add_entry(new_attrs, new_name)
1192             elif ent is not None:
1193                 self.inodes.del_entry(ent)
1194
1195
1196 class SharedDirectory(Directory):
1197     """A special directory that represents users or groups who have shared projects with me."""
1198
1199     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, exclude,
1200                  poll=False, poll_time=60, storage_classes=None):
1201         super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
1202         self.api = api
1203         self.num_retries = num_retries
1204         self.current_user = api.users().current().execute(num_retries=num_retries)
1205         self._poll = True
1206         self._poll_time = poll_time
1207         self._updating_lock = threading.Lock()
1208         self.storage_classes = storage_classes
1209
1210     @use_counter
1211     def update(self):
1212         try:
1213             with llfuse.lock_released:
1214                 self._updating_lock.acquire()
1215                 if not self.stale():
1216                     return
1217
1218                 contents = {}
1219                 roots = []
1220                 root_owners = set()
1221                 objects = {}
1222
1223                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1224                 if 'httpMethod' in methods.get('shared', {}):
1225                     page = []
1226                     while True:
1227                         resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
1228                                                         order="uuid",
1229                                                         limit=10000,
1230                                                         count="none",
1231                                                         include="owner_uuid").execute()
1232                         if not resp["items"]:
1233                             break
1234                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1235                         for r in resp["items"]:
1236                             objects[r["uuid"]] = r
1237                             roots.append(r["uuid"])
1238                         for r in resp["included"]:
1239                             objects[r["uuid"]] = r
1240                             root_owners.add(r["uuid"])
1241                 else:
1242                     all_projects = list(arvados.util.keyset_list_all(
1243                         self.api.groups().list,
1244                         order_key="uuid",
1245                         num_retries=self.num_retries,
1246                         filters=[['group_class','in',['project','filter']]],
1247                         select=["uuid", "owner_uuid"]))
1248                     for ob in all_projects:
1249                         objects[ob['uuid']] = ob
1250
1251                     current_uuid = self.current_user['uuid']
1252                     for ob in all_projects:
1253                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1254                             roots.append(ob['uuid'])
1255                             root_owners.add(ob['owner_uuid'])
1256
1257                     lusers = arvados.util.keyset_list_all(
1258                         self.api.users().list,
1259                         order_key="uuid",
1260                         num_retries=self.num_retries,
1261                         filters=[['uuid','in', list(root_owners)]])
1262                     lgroups = arvados.util.keyset_list_all(
1263                         self.api.groups().list,
1264                         order_key="uuid",
1265                         num_retries=self.num_retries,
1266                         filters=[['uuid','in', list(root_owners)+roots]])
1267
1268                     for l in lusers:
1269                         objects[l["uuid"]] = l
1270                     for l in lgroups:
1271                         objects[l["uuid"]] = l
1272
1273                 for r in root_owners:
1274                     if r in objects:
1275                         obr = objects[r]
1276                         if obr.get("name"):
1277                             contents[obr["name"]] = obr
1278                         elif "first_name" in obr:
1279                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1280
1281                 for r in roots:
1282                     if r in objects:
1283                         obr = objects[r]
1284                         if obr['owner_uuid'] not in objects:
1285                             contents[obr["name"]] = obr
1286
1287             # end with llfuse.lock_released, re-acquire lock
1288
1289             self.merge(contents.items(),
1290                        lambda i: i[0],
1291                        lambda a, i: a.uuid() == i[1]['uuid'],
1292                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
1293                                                   i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
1294         except Exception:
1295             _logger.exception("arv-mount shared dir error")
1296         finally:
1297             self._updating_lock.release()
1298
1299     def want_event_subscribe(self):
1300         return True