18941: Need to leave some space for current block
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import apiclient
6 import arvados
7 import errno
8 import functools
9 import llfuse
10 import logging
11 import re
12 import sys
13 import threading
14 import time
15 from apiclient import errors as apiclient_errors
16
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
19
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
22
23 _logger = logging.getLogger('arvados.arvados_fuse')
24
25
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile('[\x00/]')
30
31
32 class Directory(FreshBase):
33     """Generic directory object, backed by a dict.
34
35     Consists of a set of entries with the key representing the filename
36     and the value referencing a File or Directory object.
37     """
38
39     def __init__(self, parent_inode, inodes, apiconfig, enable_write):
40         """parent_inode is the integer inode number"""
41
42         super(Directory, self).__init__()
43
44         self.inode = None
45         if not isinstance(parent_inode, int):
46             raise Exception("parent_inode should be an int")
47         self.parent_inode = parent_inode
48         self.inodes = inodes
49         self.apiconfig = apiconfig
50         self._entries = {}
51         self._mtime = time.time()
52         self._enable_write = enable_write
53
54     def forward_slash_subst(self):
55         if not hasattr(self, '_fsns'):
56             self._fsns = None
57             config = self.apiconfig()
58             try:
59                 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
60             except KeyError:
61                 # old API server with no FSNS config
62                 self._fsns = '_'
63             else:
64                 if self._fsns == '' or self._fsns == '/':
65                     self._fsns = None
66         return self._fsns
67
68     def unsanitize_filename(self, incoming):
69         """Replace ForwardSlashNameSubstitution value with /"""
70         fsns = self.forward_slash_subst()
71         if isinstance(fsns, str):
72             return incoming.replace(fsns, '/')
73         else:
74             return incoming
75
76     def sanitize_filename(self, dirty):
77         """Replace disallowed filename characters according to
78         ForwardSlashNameSubstitution in self.api_config."""
79         # '.' and '..' are not reachable if API server is newer than #6277
80         if dirty is None:
81             return None
82         elif dirty == '':
83             return '_'
84         elif dirty == '.':
85             return '_'
86         elif dirty == '..':
87             return '__'
88         else:
89             fsns = self.forward_slash_subst()
90             if isinstance(fsns, str):
91                 dirty = dirty.replace('/', fsns)
92             return _disallowed_filename_characters.sub('_', dirty)
93
94
95     #  Overridden by subclasses to implement logic to update the
96     #  entries dict when the directory is stale
97     @use_counter
98     def update(self):
99         pass
100
101     # Only used when computing the size of the disk footprint of the directory
102     # (stub)
103     def size(self):
104         return 0
105
106     def persisted(self):
107         return False
108
109     def checkupdate(self):
110         if self.stale():
111             try:
112                 self.update()
113             except apiclient.errors.HttpError as e:
114                 _logger.warn(e)
115
116     @use_counter
117     @check_update
118     def __getitem__(self, item):
119         return self._entries[item]
120
121     @use_counter
122     @check_update
123     def items(self):
124         return list(self._entries.items())
125
126     @use_counter
127     @check_update
128     def __contains__(self, k):
129         return k in self._entries
130
131     @use_counter
132     @check_update
133     def __len__(self):
134         return len(self._entries)
135
136     def fresh(self):
137         self.inodes.touch(self)
138         super(Directory, self).fresh()
139
140     def merge(self, items, fn, same, new_entry):
141         """Helper method for updating the contents of the directory.
142
143         Takes a list describing the new contents of the directory, reuse
144         entries that are the same in both the old and new lists, create new
145         entries, and delete old entries missing from the new list.
146
147         :items: iterable with new directory contents
148
149         :fn: function to take an entry in 'items' and return the desired file or
150         directory name, or None if this entry should be skipped
151
152         :same: function to compare an existing entry (a File or Directory
153         object) with an entry in the items list to determine whether to keep
154         the existing entry.
155
156         :new_entry: function to create a new directory entry (File or Directory
157         object) from an entry in the items list.
158
159         """
160
161         oldentries = self._entries
162         self._entries = {}
163         changed = False
164         for i in items:
165             name = self.sanitize_filename(fn(i))
166             if name:
167                 if name in oldentries and same(oldentries[name], i):
168                     # move existing directory entry over
169                     self._entries[name] = oldentries[name]
170                     del oldentries[name]
171                 else:
172                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
173                     # create new directory entry
174                     ent = new_entry(i)
175                     if ent is not None:
176                         self._entries[name] = self.inodes.add_entry(ent)
177                         changed = True
178
179         # delete any other directory entries that were not in found in 'items'
180         for i in oldentries:
181             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
182             self.inodes.invalidate_entry(self, i)
183             self.inodes.del_entry(oldentries[i])
184             changed = True
185
186         if changed:
187             self.inodes.invalidate_inode(self)
188             self._mtime = time.time()
189
190         self.fresh()
191
192     def in_use(self):
193         if super(Directory, self).in_use():
194             return True
195         for v in self._entries.values():
196             if v.in_use():
197                 return True
198         return False
199
200     def has_ref(self, only_children):
201         if super(Directory, self).has_ref(only_children):
202             return True
203         for v in self._entries.values():
204             if v.has_ref(False):
205                 return True
206         return False
207
208     def clear(self):
209         """Delete all entries"""
210         oldentries = self._entries
211         self._entries = {}
212         for n in oldentries:
213             oldentries[n].clear()
214             self.inodes.del_entry(oldentries[n])
215         self.invalidate()
216
217     def kernel_invalidate(self):
218         # Invalidating the dentry on the parent implies invalidating all paths
219         # below it as well.
220         parent = self.inodes[self.parent_inode]
221
222         # Find self on the parent in order to invalidate this path.
223         # Calling the public items() method might trigger a refresh,
224         # which we definitely don't want, so read the internal dict directly.
225         for k,v in parent._entries.items():
226             if v is self:
227                 self.inodes.invalidate_entry(parent, k)
228                 break
229
230     def mtime(self):
231         return self._mtime
232
233     def writable(self):
234         return False
235
236     def flush(self):
237         pass
238
239     def want_event_subscribe(self):
240         raise NotImplementedError()
241
242     def create(self, name):
243         raise NotImplementedError()
244
245     def mkdir(self, name):
246         raise NotImplementedError()
247
248     def unlink(self, name):
249         raise NotImplementedError()
250
251     def rmdir(self, name):
252         raise NotImplementedError()
253
254     def rename(self, name_old, name_new, src):
255         raise NotImplementedError()
256
257
258 class CollectionDirectoryBase(Directory):
259     """Represent an Arvados Collection as a directory.
260
261     This class is used for Subcollections, and is also the base class for
262     CollectionDirectory, which implements collection loading/saving on
263     Collection records.
264
265     Most operations act only the underlying Arvados `Collection` object.  The
266     `Collection` object signals via a notify callback to
267     `CollectionDirectoryBase.on_event` that an item was added, removed or
268     modified.  FUSE inodes and directory entries are created, deleted or
269     invalidated in response to these events.
270
271     """
272
273     def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection, collection_root):
274         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write)
275         self.apiconfig = apiconfig
276         self.collection = collection
277         self.collection_root = collection_root
278         self.collection_record_file = None
279
280     def new_entry(self, name, item, mtime):
281         name = self.sanitize_filename(name)
282         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
283             if item.fuse_entry.dead is not True:
284                 raise Exception("Can only reparent dead inode entry")
285             if item.fuse_entry.inode is None:
286                 raise Exception("Reparented entry must still have valid inode")
287             item.fuse_entry.dead = False
288             self._entries[name] = item.fuse_entry
289         elif isinstance(item, arvados.collection.RichCollectionBase):
290             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item, self.collection_root))
291             self._entries[name].populate(mtime)
292         else:
293             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
294         item.fuse_entry = self._entries[name]
295
296     def on_event(self, event, collection, name, item):
297         # These are events from the Collection object (ADD/DEL/MOD)
298         # emitted by operations on the Collection object (like
299         # "mkdirs" or "remove"), and by "update", which we need to
300         # synchronize with our FUSE objects that are assigned inodes.
301         if collection == self.collection:
302             name = self.sanitize_filename(name)
303
304             #
305             # It's possible for another thread to have llfuse.lock and
306             # be waiting on collection.lock.  Meanwhile, we released
307             # llfuse.lock earlier in the stack, but are still holding
308             # on to the collection lock, and now we need to re-acquire
309             # llfuse.lock.  If we don't release the collection lock,
310             # we'll deadlock where we're holding the collection lock
311             # waiting for llfuse.lock and the other thread is holding
312             # llfuse.lock and waiting for the collection lock.
313             #
314             # The correct locking order here is to take llfuse.lock
315             # first, then the collection lock.
316             #
317             # Since collection.lock is an RLock, it might be locked
318             # multiple times, so we need to release it multiple times,
319             # keep a count, then re-lock it the correct number of
320             # times.
321             #
322             lockcount = 0
323             try:
324                 while True:
325                     self.collection.lock.release()
326                     lockcount += 1
327             except RuntimeError:
328                 pass
329
330             try:
331                 with llfuse.lock:
332                     with self.collection.lock:
333                         if event == arvados.collection.ADD:
334                             self.new_entry(name, item, self.mtime())
335                         elif event == arvados.collection.DEL:
336                             ent = self._entries[name]
337                             del self._entries[name]
338                             self.inodes.invalidate_entry(self, name)
339                             self.inodes.del_entry(ent)
340                         elif event == arvados.collection.MOD:
341                             if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
342                                 self.inodes.invalidate_inode(item.fuse_entry)
343                             elif name in self._entries:
344                                 self.inodes.invalidate_inode(self._entries[name])
345
346                         if self.collection_record_file is not None:
347                             self.collection_record_file.invalidate()
348                             self.inodes.invalidate_inode(self.collection_record_file)
349             finally:
350                 while lockcount > 0:
351                     self.collection.lock.acquire()
352                     lockcount -= 1
353
354     def populate(self, mtime):
355         self._mtime = mtime
356         with self.collection.lock:
357             self.collection.subscribe(self.on_event)
358             for entry, item in self.collection.items():
359                 self.new_entry(entry, item, self.mtime())
360
361     def writable(self):
362         return self._enable_write and self.collection.writable()
363
364     @use_counter
365     def flush(self):
366         self.collection_root.flush()
367
368     @use_counter
369     @check_update
370     def create(self, name):
371         if not self.writable():
372             raise llfuse.FUSEError(errno.EROFS)
373         with llfuse.lock_released:
374             self.collection.open(name, "w").close()
375
376     @use_counter
377     @check_update
378     def mkdir(self, name):
379         if not self.writable():
380             raise llfuse.FUSEError(errno.EROFS)
381         with llfuse.lock_released:
382             self.collection.mkdirs(name)
383
384     @use_counter
385     @check_update
386     def unlink(self, name):
387         if not self.writable():
388             raise llfuse.FUSEError(errno.EROFS)
389         with llfuse.lock_released:
390             self.collection.remove(name)
391         self.flush()
392
393     @use_counter
394     @check_update
395     def rmdir(self, name):
396         if not self.writable():
397             raise llfuse.FUSEError(errno.EROFS)
398         with llfuse.lock_released:
399             self.collection.remove(name)
400         self.flush()
401
402     @use_counter
403     @check_update
404     def rename(self, name_old, name_new, src):
405         if not self.writable():
406             raise llfuse.FUSEError(errno.EROFS)
407
408         if not isinstance(src, CollectionDirectoryBase):
409             raise llfuse.FUSEError(errno.EPERM)
410
411         if name_new in self:
412             ent = src[name_old]
413             tgt = self[name_new]
414             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
415                 pass
416             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
417                 if len(tgt) > 0:
418                     raise llfuse.FUSEError(errno.ENOTEMPTY)
419             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
420                 raise llfuse.FUSEError(errno.ENOTDIR)
421             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
422                 raise llfuse.FUSEError(errno.EISDIR)
423
424         with llfuse.lock_released:
425             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
426         self.flush()
427         src.flush()
428
429     def clear(self):
430         super(CollectionDirectoryBase, self).clear()
431         self.collection = None
432
433
434 class CollectionDirectory(CollectionDirectoryBase):
435     """Represents the root of a directory tree representing a collection."""
436
437     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, collection_record=None, explicit_collection=None):
438         super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None, self)
439         self.api = api
440         self.num_retries = num_retries
441         self._poll = True
442         try:
443             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
444         except:
445             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
446             self._poll_time = 60*60
447
448         if isinstance(collection_record, dict):
449             self.collection_locator = collection_record['uuid']
450             self._mtime = convertTime(collection_record.get('modified_at'))
451         else:
452             self.collection_locator = collection_record
453             self._mtime = 0
454         self._manifest_size = 0
455         if self.collection_locator:
456             self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
457         self._updating_lock = threading.Lock()
458
459     def same(self, i):
460         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
461
462     def writable(self):
463         return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
464
465     @use_counter
466     def flush(self):
467         if not self.writable():
468             return
469         with llfuse.lock_released:
470             with self._updating_lock:
471                 if self.collection.committed():
472                     self.collection.update()
473                 else:
474                     self.collection.save()
475                 self.new_collection_record(self.collection.api_response())
476
477     def want_event_subscribe(self):
478         return (uuid_pattern.match(self.collection_locator) is not None)
479
480     def new_collection(self, new_collection_record, coll_reader):
481         if self.inode:
482             self.clear()
483         self.collection = coll_reader
484         self.new_collection_record(new_collection_record)
485         self.populate(self.mtime())
486
487     def new_collection_record(self, new_collection_record):
488         if not new_collection_record:
489             raise Exception("invalid new_collection_record")
490         self._mtime = convertTime(new_collection_record.get('modified_at'))
491         self._manifest_size = len(new_collection_record["manifest_text"])
492         self.collection_locator = new_collection_record["uuid"]
493         if self.collection_record_file is not None:
494             self.collection_record_file.invalidate()
495             self.inodes.invalidate_inode(self.collection_record_file)
496             _logger.debug("%s invalidated collection record file", self)
497         self.fresh()
498
499     def uuid(self):
500         return self.collection_locator
501
502     @use_counter
503     def update(self):
504         try:
505             if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator):
506                 # It's immutable, nothing to update
507                 return True
508
509             if self.collection_locator is None:
510                 # No collection locator to retrieve from
511                 self.fresh()
512                 return True
513
514             new_collection_record = None
515             try:
516                 with llfuse.lock_released:
517                     self._updating_lock.acquire()
518                     if not self.stale():
519                         return True
520
521                     _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
522                     coll_reader = None
523                     if self.collection is not None:
524                         # Already have a collection object
525                         self.collection.update()
526                         new_collection_record = self.collection.api_response()
527                     else:
528                         get_threads = max(self.api.keep.block_cache.cache_max // (64 * 1024 * 1024)) - 1, 0)
529                         # Create a new collection object
530                         if uuid_pattern.match(self.collection_locator):
531                             coll_reader = arvados.collection.Collection(
532                                 self.collection_locator, self.api, self.api.keep,
533                                 num_retries=self.num_retries,
534                                 get_threads=get_threads)
535                         else:
536                             coll_reader = arvados.collection.CollectionReader(
537                                 self.collection_locator, self.api, self.api.keep,
538                                 num_retries=self.num_retries,
539                                 get_threads=get_threads)
540                             )
541                         new_collection_record = coll_reader.api_response() or {}
542                         # If the Collection only exists in Keep, there will be no API
543                         # response.  Fill in the fields we need.
544                         if 'uuid' not in new_collection_record:
545                             new_collection_record['uuid'] = self.collection_locator
546                         if "portable_data_hash" not in new_collection_record:
547                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
548                         if 'manifest_text' not in new_collection_record:
549                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
550                         if 'storage_classes_desired' not in new_collection_record:
551                             new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
552
553                 # end with llfuse.lock_released, re-acquire lock
554
555                 if new_collection_record is not None:
556                     if coll_reader is not None:
557                         self.new_collection(new_collection_record, coll_reader)
558                     else:
559                         self.new_collection_record(new_collection_record)
560
561                 return True
562             finally:
563                 self._updating_lock.release()
564         except arvados.errors.NotFoundError as e:
565             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
566         except arvados.errors.ArgumentError as detail:
567             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
568             if new_collection_record is not None and "manifest_text" in new_collection_record:
569                 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
570         except Exception:
571             _logger.exception("arv-mount %s: error", self.collection_locator)
572             if new_collection_record is not None and "manifest_text" in new_collection_record:
573                 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
574         self.invalidate()
575         return False
576
577     @use_counter
578     def collection_record(self):
579         self.flush()
580         return self.collection.api_response()
581
582     @use_counter
583     @check_update
584     def __getitem__(self, item):
585         if item == '.arvados#collection':
586             if self.collection_record_file is None:
587                 self.collection_record_file = FuncToJSONFile(
588                     self.inode, self.collection_record)
589                 self.inodes.add_entry(self.collection_record_file)
590             self.invalidate()  # use lookup as a signal to force update
591             return self.collection_record_file
592         else:
593             return super(CollectionDirectory, self).__getitem__(item)
594
595     def __contains__(self, k):
596         if k == '.arvados#collection':
597             return True
598         else:
599             return super(CollectionDirectory, self).__contains__(k)
600
601     def invalidate(self):
602         if self.collection_record_file is not None:
603             self.collection_record_file.invalidate()
604             self.inodes.invalidate_inode(self.collection_record_file)
605         super(CollectionDirectory, self).invalidate()
606
607     def persisted(self):
608         return (self.collection_locator is not None)
609
610     def objsize(self):
611         # This is an empirically-derived heuristic to estimate the memory used
612         # to store this collection's metadata.  Calculating the memory
613         # footprint directly would be more accurate, but also more complicated.
614         return self._manifest_size * 128
615
616     def finalize(self):
617         if self.collection is not None:
618             if self.writable():
619                 self.collection.save()
620             self.collection.stop_threads()
621
622     def clear(self):
623         if self.collection is not None:
624             self.collection.stop_threads()
625         super(CollectionDirectory, self).clear()
626         self._manifest_size = 0
627
628
629 class TmpCollectionDirectory(CollectionDirectoryBase):
630     """A directory backed by an Arvados collection that never gets saved.
631
632     This supports using Keep as scratch space. A userspace program can
633     read the .arvados#collection file to get a current manifest in
634     order to save a snapshot of the scratch data or use it as a crunch
635     job output.
636     """
637
638     class UnsaveableCollection(arvados.collection.Collection):
639         def save(self):
640             pass
641         def save_new(self):
642             pass
643
644     def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, storage_classes=None):
645         collection = self.UnsaveableCollection(
646             api_client=api_client,
647             keep_client=api_client.keep,
648             num_retries=num_retries,
649             storage_classes_desired=storage_classes)
650         # This is always enable_write=True because it never tries to
651         # save to the backend
652         super(TmpCollectionDirectory, self).__init__(
653             parent_inode, inodes, api_client.config, True, collection, self)
654         self.populate(self.mtime())
655
656     def on_event(self, *args, **kwargs):
657         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
658         if self.collection_record_file is None:
659             return
660
661         # See discussion in CollectionDirectoryBase.on_event
662         lockcount = 0
663         try:
664             while True:
665                 self.collection.lock.release()
666                 lockcount += 1
667         except RuntimeError:
668             pass
669
670         try:
671             with llfuse.lock:
672                 with self.collection.lock:
673                     self.collection_record_file.invalidate()
674                     self.inodes.invalidate_inode(self.collection_record_file)
675                     _logger.debug("%s invalidated collection record", self)
676         finally:
677             while lockcount > 0:
678                 self.collection.lock.acquire()
679                 lockcount -= 1
680
681     def collection_record(self):
682         with llfuse.lock_released:
683             return {
684                 "uuid": None,
685                 "manifest_text": self.collection.manifest_text(),
686                 "portable_data_hash": self.collection.portable_data_hash(),
687                 "storage_classes_desired": self.collection.storage_classes_desired(),
688             }
689
690     def __contains__(self, k):
691         return (k == '.arvados#collection' or
692                 super(TmpCollectionDirectory, self).__contains__(k))
693
694     @use_counter
695     def __getitem__(self, item):
696         if item == '.arvados#collection':
697             if self.collection_record_file is None:
698                 self.collection_record_file = FuncToJSONFile(
699                     self.inode, self.collection_record)
700                 self.inodes.add_entry(self.collection_record_file)
701             return self.collection_record_file
702         return super(TmpCollectionDirectory, self).__getitem__(item)
703
704     def persisted(self):
705         return False
706
707     def writable(self):
708         return True
709
710     def flush(self):
711         pass
712
713     def want_event_subscribe(self):
714         return False
715
716     def finalize(self):
717         self.collection.stop_threads()
718
719     def invalidate(self):
720         if self.collection_record_file:
721             self.collection_record_file.invalidate()
722         super(TmpCollectionDirectory, self).invalidate()
723
724
725 class MagicDirectory(Directory):
726     """A special directory that logically contains the set of all extant keep locators.
727
728     When a file is referenced by lookup(), it is tested to see if it is a valid
729     keep locator to a manifest, and if so, loads the manifest contents as a
730     subdirectory of this directory with the locator as the directory name.
731     Since querying a list of all extant keep locators is impractical, only
732     collections that have already been accessed are visible to readdir().
733
734     """
735
736     README_TEXT = """
737 This directory provides access to Arvados collections as subdirectories listed
738 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
739 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
740 (in the form 'zzzzz-j7d0g-1234567890abcde').
741
742 Note that this directory will appear empty until you attempt to access a
743 specific collection or project subdirectory (such as trying to 'cd' into it),
744 at which point the collection or project will actually be looked up on the server
745 and the directory will appear if it exists.
746
747 """.lstrip()
748
749     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, pdh_only=False, storage_classes=None):
750         super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
751         self.api = api
752         self.num_retries = num_retries
753         self.pdh_only = pdh_only
754         self.storage_classes = storage_classes
755
756     def __setattr__(self, name, value):
757         super(MagicDirectory, self).__setattr__(name, value)
758         # When we're assigned an inode, add a README.
759         if ((name == 'inode') and (self.inode is not None) and
760               (not self._entries)):
761             self._entries['README'] = self.inodes.add_entry(
762                 StringFile(self.inode, self.README_TEXT, time.time()))
763             # If we're the root directory, add an identical by_id subdirectory.
764             if self.inode == llfuse.ROOT_INODE:
765                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
766                     self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
767                     self.pdh_only))
768
769     def __contains__(self, k):
770         if k in self._entries:
771             return True
772
773         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
774             return False
775
776         try:
777             e = None
778
779             if group_uuid_pattern.match(k):
780                 project = self.api.groups().list(
781                     filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
782                 if project[u'items_available'] == 0:
783                     return False
784                 e = self.inodes.add_entry(ProjectDirectory(
785                     self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
786                     project[u'items'][0], storage_classes=self.storage_classes))
787             else:
788                 e = self.inodes.add_entry(CollectionDirectory(
789                         self.inode, self.inodes, self.api, self.num_retries, self._enable_write, k))
790
791             if e.update():
792                 if k not in self._entries:
793                     self._entries[k] = e
794                 else:
795                     self.inodes.del_entry(e)
796                 return True
797             else:
798                 self.inodes.invalidate_entry(self, k)
799                 self.inodes.del_entry(e)
800                 return False
801         except Exception as ex:
802             _logger.exception("arv-mount lookup '%s':", k)
803             if e is not None:
804                 self.inodes.del_entry(e)
805             return False
806
807     def __getitem__(self, item):
808         if item in self:
809             return self._entries[item]
810         else:
811             raise KeyError("No collection with id " + item)
812
813     def clear(self):
814         pass
815
816     def want_event_subscribe(self):
817         return not self.pdh_only
818
819
820 class TagsDirectory(Directory):
821     """A special directory that contains as subdirectories all tags visible to the user."""
822
823     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, poll_time=60):
824         super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
825         self.api = api
826         self.num_retries = num_retries
827         self._poll = True
828         self._poll_time = poll_time
829         self._extra = set()
830
831     def want_event_subscribe(self):
832         return True
833
834     @use_counter
835     def update(self):
836         with llfuse.lock_released:
837             tags = self.api.links().list(
838                 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
839                 select=['name'], distinct=True, limit=1000
840                 ).execute(num_retries=self.num_retries)
841         if "items" in tags:
842             self.merge(tags['items']+[{"name": n} for n in self._extra],
843                        lambda i: i['name'],
844                        lambda a, i: a.tag == i['name'],
845                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
846                                               i['name'], poll=self._poll, poll_time=self._poll_time))
847
848     @use_counter
849     @check_update
850     def __getitem__(self, item):
851         if super(TagsDirectory, self).__contains__(item):
852             return super(TagsDirectory, self).__getitem__(item)
853         with llfuse.lock_released:
854             tags = self.api.links().list(
855                 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
856             ).execute(num_retries=self.num_retries)
857         if tags["items"]:
858             self._extra.add(item)
859             self.update()
860         return super(TagsDirectory, self).__getitem__(item)
861
862     @use_counter
863     @check_update
864     def __contains__(self, k):
865         if super(TagsDirectory, self).__contains__(k):
866             return True
867         try:
868             self[k]
869             return True
870         except KeyError:
871             pass
872         return False
873
874
875 class TagDirectory(Directory):
876     """A special directory that contains as subdirectories all collections visible
877     to the user that are tagged with a particular tag.
878     """
879
880     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, tag,
881                  poll=False, poll_time=60):
882         super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
883         self.api = api
884         self.num_retries = num_retries
885         self.tag = tag
886         self._poll = poll
887         self._poll_time = poll_time
888
889     def want_event_subscribe(self):
890         return True
891
892     @use_counter
893     def update(self):
894         with llfuse.lock_released:
895             taggedcollections = self.api.links().list(
896                 filters=[['link_class', '=', 'tag'],
897                          ['name', '=', self.tag],
898                          ['head_uuid', 'is_a', 'arvados#collection']],
899                 select=['head_uuid']
900                 ).execute(num_retries=self.num_retries)
901         self.merge(taggedcollections['items'],
902                    lambda i: i['head_uuid'],
903                    lambda a, i: a.collection_locator == i['head_uuid'],
904                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid']))
905
906
907 class ProjectDirectory(Directory):
908     """A special directory that contains the contents of a project."""
909
910     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, project_object,
911                  poll=True, poll_time=3, storage_classes=None):
912         super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
913         self.api = api
914         self.num_retries = num_retries
915         self.project_object = project_object
916         self.project_object_file = None
917         self.project_uuid = project_object['uuid']
918         self._poll = poll
919         self._poll_time = poll_time
920         self._updating_lock = threading.Lock()
921         self._current_user = None
922         self._full_listing = False
923         self.storage_classes = storage_classes
924
925     def want_event_subscribe(self):
926         return True
927
928     def createDirectory(self, i):
929         if collection_uuid_pattern.match(i['uuid']):
930             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i)
931         elif group_uuid_pattern.match(i['uuid']):
932             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
933                                     i, self._poll, self._poll_time, self.storage_classes)
934         elif link_uuid_pattern.match(i['uuid']):
935             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
936                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid'])
937             else:
938                 return None
939         elif uuid_pattern.match(i['uuid']):
940             return ObjectFile(self.parent_inode, i)
941         else:
942             return None
943
944     def uuid(self):
945         return self.project_uuid
946
947     def items(self):
948         self._full_listing = True
949         return super(ProjectDirectory, self).items()
950
951     def namefn(self, i):
952         if 'name' in i:
953             if i['name'] is None or len(i['name']) == 0:
954                 return None
955             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
956                 # collection or subproject
957                 return i['name']
958             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
959                 # name link
960                 return i['name']
961             elif 'kind' in i and i['kind'].startswith('arvados#'):
962                 # something else
963                 return "{}.{}".format(i['name'], i['kind'][8:])
964         else:
965             return None
966
967
968     @use_counter
969     def update(self):
970         if self.project_object_file == None:
971             self.project_object_file = ObjectFile(self.inode, self.project_object)
972             self.inodes.add_entry(self.project_object_file)
973
974         if not self._full_listing:
975             return True
976
977         def samefn(a, i):
978             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
979                 return a.uuid() == i['uuid']
980             elif isinstance(a, ObjectFile):
981                 return a.uuid() == i['uuid'] and not a.stale()
982             return False
983
984         try:
985             with llfuse.lock_released:
986                 self._updating_lock.acquire()
987                 if not self.stale():
988                     return
989
990                 if group_uuid_pattern.match(self.project_uuid):
991                     self.project_object = self.api.groups().get(
992                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
993                 elif user_uuid_pattern.match(self.project_uuid):
994                     self.project_object = self.api.users().get(
995                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
996                 # do this in 2 steps until #17424 is fixed
997                 contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
998                                                         order_key="uuid",
999                                                         num_retries=self.num_retries,
1000                                                         uuid=self.project_uuid,
1001                                                         filters=[["uuid", "is_a", "arvados#group"],
1002                                                                  ["groups.group_class", "in", ["project","filter"]]]))
1003                 contents.extend(filter(lambda i: i["current_version_uuid"] == i["uuid"],
1004                                        arvados.util.keyset_list_all(self.api.groups().contents,
1005                                                              order_key="uuid",
1006                                                              num_retries=self.num_retries,
1007                                                              uuid=self.project_uuid,
1008                                                              filters=[["uuid", "is_a", "arvados#collection"]])))
1009
1010
1011             # end with llfuse.lock_released, re-acquire lock
1012
1013             self.merge(contents,
1014                        self.namefn,
1015                        samefn,
1016                        self.createDirectory)
1017             return True
1018         finally:
1019             self._updating_lock.release()
1020
1021     def _add_entry(self, i, name):
1022         ent = self.createDirectory(i)
1023         self._entries[name] = self.inodes.add_entry(ent)
1024         return self._entries[name]
1025
1026     @use_counter
1027     @check_update
1028     def __getitem__(self, k):
1029         if k == '.arvados#project':
1030             return self.project_object_file
1031         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1032             return super(ProjectDirectory, self).__getitem__(k)
1033         with llfuse.lock_released:
1034             k2 = self.unsanitize_filename(k)
1035             if k2 == k:
1036                 namefilter = ["name", "=", k]
1037             else:
1038                 namefilter = ["name", "in", [k, k2]]
1039             contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
1040                                                        ["group_class", "in", ["project","filter"]],
1041                                                        namefilter],
1042                                               limit=2).execute(num_retries=self.num_retries)["items"]
1043             if not contents:
1044                 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
1045                                                                 namefilter],
1046                                                        limit=2).execute(num_retries=self.num_retries)["items"]
1047         if contents:
1048             if len(contents) > 1 and contents[1]['name'] == k:
1049                 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1050                 # "foo[SUBST]bar".
1051                 contents = [contents[1]]
1052             name = self.sanitize_filename(self.namefn(contents[0]))
1053             if name != k:
1054                 raise KeyError(k)
1055             return self._add_entry(contents[0], name)
1056
1057         # Didn't find item
1058         raise KeyError(k)
1059
1060     def __contains__(self, k):
1061         if k == '.arvados#project':
1062             return True
1063         try:
1064             self[k]
1065             return True
1066         except KeyError:
1067             pass
1068         return False
1069
1070     @use_counter
1071     @check_update
1072     def writable(self):
1073         if not self._enable_write:
1074             return False
1075         with llfuse.lock_released:
1076             if not self._current_user:
1077                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1078             return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1079
1080     def persisted(self):
1081         return True
1082
1083     @use_counter
1084     @check_update
1085     def mkdir(self, name):
1086         if not self.writable():
1087             raise llfuse.FUSEError(errno.EROFS)
1088
1089         try:
1090             with llfuse.lock_released:
1091                 c = {
1092                     "owner_uuid": self.project_uuid,
1093                     "name": name,
1094                     "manifest_text": "" }
1095                 if self.storage_classes is not None:
1096                     c["storage_classes_desired"] = self.storage_classes
1097                 try:
1098                     self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1099                 except Exception as e:
1100                     raise
1101             self.invalidate()
1102         except apiclient_errors.Error as error:
1103             _logger.error(error)
1104             raise llfuse.FUSEError(errno.EEXIST)
1105
1106     @use_counter
1107     @check_update
1108     def rmdir(self, name):
1109         if not self.writable():
1110             raise llfuse.FUSEError(errno.EROFS)
1111
1112         if name not in self:
1113             raise llfuse.FUSEError(errno.ENOENT)
1114         if not isinstance(self[name], CollectionDirectory):
1115             raise llfuse.FUSEError(errno.EPERM)
1116         if len(self[name]) > 0:
1117             raise llfuse.FUSEError(errno.ENOTEMPTY)
1118         with llfuse.lock_released:
1119             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1120         self.invalidate()
1121
1122     @use_counter
1123     @check_update
1124     def rename(self, name_old, name_new, src):
1125         if not self.writable():
1126             raise llfuse.FUSEError(errno.EROFS)
1127
1128         if not isinstance(src, ProjectDirectory):
1129             raise llfuse.FUSEError(errno.EPERM)
1130
1131         ent = src[name_old]
1132
1133         if not isinstance(ent, CollectionDirectory):
1134             raise llfuse.FUSEError(errno.EPERM)
1135
1136         if name_new in self:
1137             # POSIX semantics for replacing one directory with another is
1138             # tricky (the target directory must be empty, the operation must be
1139             # atomic which isn't possible with the Arvados API as of this
1140             # writing) so don't support that.
1141             raise llfuse.FUSEError(errno.EPERM)
1142
1143         self.api.collections().update(uuid=ent.uuid(),
1144                                       body={"owner_uuid": self.uuid(),
1145                                             "name": name_new}).execute(num_retries=self.num_retries)
1146
1147         # Acually move the entry from source directory to this directory.
1148         del src._entries[name_old]
1149         self._entries[name_new] = ent
1150         self.inodes.invalidate_entry(src, name_old)
1151
1152     @use_counter
1153     def child_event(self, ev):
1154         properties = ev.get("properties") or {}
1155         old_attrs = properties.get("old_attributes") or {}
1156         new_attrs = properties.get("new_attributes") or {}
1157         old_attrs["uuid"] = ev["object_uuid"]
1158         new_attrs["uuid"] = ev["object_uuid"]
1159         old_name = self.sanitize_filename(self.namefn(old_attrs))
1160         new_name = self.sanitize_filename(self.namefn(new_attrs))
1161
1162         # create events will have a new name, but not an old name
1163         # delete events will have an old name, but not a new name
1164         # update events will have an old and new name, and they may be same or different
1165         # if they are the same, an unrelated field changed and there is nothing to do.
1166
1167         if old_attrs.get("owner_uuid") != self.project_uuid:
1168             # Was moved from somewhere else, so don't try to remove entry.
1169             old_name = None
1170         if ev.get("object_owner_uuid") != self.project_uuid:
1171             # Was moved to somewhere else, so don't try to add entry
1172             new_name = None
1173
1174         if old_attrs.get("is_trashed"):
1175             # Was previously deleted
1176             old_name = None
1177         if new_attrs.get("is_trashed"):
1178             # Has been deleted
1179             new_name = None
1180
1181         if new_name != old_name:
1182             ent = None
1183             if old_name in self._entries:
1184                 ent = self._entries[old_name]
1185                 del self._entries[old_name]
1186                 self.inodes.invalidate_entry(self, old_name)
1187
1188             if new_name:
1189                 if ent is not None:
1190                     self._entries[new_name] = ent
1191                 else:
1192                     self._add_entry(new_attrs, new_name)
1193             elif ent is not None:
1194                 self.inodes.del_entry(ent)
1195
1196
1197 class SharedDirectory(Directory):
1198     """A special directory that represents users or groups who have shared projects with me."""
1199
1200     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, exclude,
1201                  poll=False, poll_time=60, storage_classes=None):
1202         super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
1203         self.api = api
1204         self.num_retries = num_retries
1205         self.current_user = api.users().current().execute(num_retries=num_retries)
1206         self._poll = True
1207         self._poll_time = poll_time
1208         self._updating_lock = threading.Lock()
1209         self.storage_classes = storage_classes
1210
1211     @use_counter
1212     def update(self):
1213         try:
1214             with llfuse.lock_released:
1215                 self._updating_lock.acquire()
1216                 if not self.stale():
1217                     return
1218
1219                 contents = {}
1220                 roots = []
1221                 root_owners = set()
1222                 objects = {}
1223
1224                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1225                 if 'httpMethod' in methods.get('shared', {}):
1226                     page = []
1227                     while True:
1228                         resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
1229                                                         order="uuid",
1230                                                         limit=10000,
1231                                                         count="none",
1232                                                         include="owner_uuid").execute()
1233                         if not resp["items"]:
1234                             break
1235                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1236                         for r in resp["items"]:
1237                             objects[r["uuid"]] = r
1238                             roots.append(r["uuid"])
1239                         for r in resp["included"]:
1240                             objects[r["uuid"]] = r
1241                             root_owners.add(r["uuid"])
1242                 else:
1243                     all_projects = list(arvados.util.keyset_list_all(
1244                         self.api.groups().list,
1245                         order_key="uuid",
1246                         num_retries=self.num_retries,
1247                         filters=[['group_class','in',['project','filter']]],
1248                         select=["uuid", "owner_uuid"]))
1249                     for ob in all_projects:
1250                         objects[ob['uuid']] = ob
1251
1252                     current_uuid = self.current_user['uuid']
1253                     for ob in all_projects:
1254                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1255                             roots.append(ob['uuid'])
1256                             root_owners.add(ob['owner_uuid'])
1257
1258                     lusers = arvados.util.keyset_list_all(
1259                         self.api.users().list,
1260                         order_key="uuid",
1261                         num_retries=self.num_retries,
1262                         filters=[['uuid','in', list(root_owners)]])
1263                     lgroups = arvados.util.keyset_list_all(
1264                         self.api.groups().list,
1265                         order_key="uuid",
1266                         num_retries=self.num_retries,
1267                         filters=[['uuid','in', list(root_owners)+roots]])
1268
1269                     for l in lusers:
1270                         objects[l["uuid"]] = l
1271                     for l in lgroups:
1272                         objects[l["uuid"]] = l
1273
1274                 for r in root_owners:
1275                     if r in objects:
1276                         obr = objects[r]
1277                         if obr.get("name"):
1278                             contents[obr["name"]] = obr
1279                         elif "first_name" in obr:
1280                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1281
1282                 for r in roots:
1283                     if r in objects:
1284                         obr = objects[r]
1285                         if obr['owner_uuid'] not in objects:
1286                             contents[obr["name"]] = obr
1287
1288             # end with llfuse.lock_released, re-acquire lock
1289
1290             self.merge(contents.items(),
1291                        lambda i: i[0],
1292                        lambda a, i: a.uuid() == i[1]['uuid'],
1293                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
1294                                                   i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
1295         except Exception:
1296             _logger.exception("arv-mount shared dir error")
1297         finally:
1298             self._updating_lock.release()
1299
1300     def want_event_subscribe(self):
1301         return True