Merge branch '21600-banner-tests'
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import apiclient
6 import arvados
7 import errno
8 import functools
9 import llfuse
10 import logging
11 import re
12 import sys
13 import threading
14 import time
15 from apiclient import errors as apiclient_errors
16
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
19
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
22
23 _logger = logging.getLogger('arvados.arvados_fuse')
24
25
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile(r'[\x00/]')
30
31
32 class Directory(FreshBase):
33     """Generic directory object, backed by a dict.
34
35     Consists of a set of entries with the key representing the filename
36     and the value referencing a File or Directory object.
37     """
38
39     def __init__(self, parent_inode, inodes, apiconfig, enable_write, filters):
40         """parent_inode is the integer inode number"""
41
42         super(Directory, self).__init__()
43
44         self.inode = None
45         if not isinstance(parent_inode, int):
46             raise Exception("parent_inode should be an int")
47         self.parent_inode = parent_inode
48         self.inodes = inodes
49         self.apiconfig = apiconfig
50         self._entries = {}
51         self._mtime = time.time()
52         self._enable_write = enable_write
53         self._filters = filters or []
54
55     def _filters_for(self, subtype, *, qualified):
56         for f in self._filters:
57             f_type, _, f_name = f[0].partition('.')
58             if not f_name:
59                 yield f
60             elif f_type != subtype:
61                 pass
62             elif qualified:
63                 yield f
64             else:
65                 yield [f_name, *f[1:]]
66
67     def forward_slash_subst(self):
68         if not hasattr(self, '_fsns'):
69             self._fsns = None
70             config = self.apiconfig()
71             try:
72                 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
73             except KeyError:
74                 # old API server with no FSNS config
75                 self._fsns = '_'
76             else:
77                 if self._fsns == '' or self._fsns == '/':
78                     self._fsns = None
79         return self._fsns
80
81     def unsanitize_filename(self, incoming):
82         """Replace ForwardSlashNameSubstitution value with /"""
83         fsns = self.forward_slash_subst()
84         if isinstance(fsns, str):
85             return incoming.replace(fsns, '/')
86         else:
87             return incoming
88
89     def sanitize_filename(self, dirty):
90         """Replace disallowed filename characters according to
91         ForwardSlashNameSubstitution in self.api_config."""
92         # '.' and '..' are not reachable if API server is newer than #6277
93         if dirty is None:
94             return None
95         elif dirty == '':
96             return '_'
97         elif dirty == '.':
98             return '_'
99         elif dirty == '..':
100             return '__'
101         else:
102             fsns = self.forward_slash_subst()
103             if isinstance(fsns, str):
104                 dirty = dirty.replace('/', fsns)
105             return _disallowed_filename_characters.sub('_', dirty)
106
107
108     #  Overridden by subclasses to implement logic to update the
109     #  entries dict when the directory is stale
110     @use_counter
111     def update(self):
112         pass
113
114     # Only used when computing the size of the disk footprint of the directory
115     # (stub)
116     def size(self):
117         return 0
118
119     def persisted(self):
120         return False
121
122     def checkupdate(self):
123         if self.stale():
124             try:
125                 self.update()
126             except apiclient.errors.HttpError as e:
127                 _logger.warn(e)
128
129     @use_counter
130     @check_update
131     def __getitem__(self, item):
132         return self._entries[item]
133
134     @use_counter
135     @check_update
136     def items(self):
137         return list(self._entries.items())
138
139     @use_counter
140     @check_update
141     def __contains__(self, k):
142         return k in self._entries
143
144     @use_counter
145     @check_update
146     def __len__(self):
147         return len(self._entries)
148
149     def fresh(self):
150         self.inodes.touch(self)
151         super(Directory, self).fresh()
152
153     def merge(self, items, fn, same, new_entry):
154         """Helper method for updating the contents of the directory.
155
156         Takes a list describing the new contents of the directory, reuse
157         entries that are the same in both the old and new lists, create new
158         entries, and delete old entries missing from the new list.
159
160         :items: iterable with new directory contents
161
162         :fn: function to take an entry in 'items' and return the desired file or
163         directory name, or None if this entry should be skipped
164
165         :same: function to compare an existing entry (a File or Directory
166         object) with an entry in the items list to determine whether to keep
167         the existing entry.
168
169         :new_entry: function to create a new directory entry (File or Directory
170         object) from an entry in the items list.
171
172         """
173
174         oldentries = self._entries
175         self._entries = {}
176         changed = False
177         for i in items:
178             name = self.sanitize_filename(fn(i))
179             if name:
180                 if name in oldentries and same(oldentries[name], i):
181                     # move existing directory entry over
182                     self._entries[name] = oldentries[name]
183                     del oldentries[name]
184                 else:
185                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
186                     # create new directory entry
187                     ent = new_entry(i)
188                     if ent is not None:
189                         self._entries[name] = self.inodes.add_entry(ent)
190                         changed = True
191
192         # delete any other directory entries that were not in found in 'items'
193         for i in oldentries:
194             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
195             self.inodes.invalidate_entry(self, i)
196             self.inodes.del_entry(oldentries[i])
197             changed = True
198
199         if changed:
200             self.inodes.invalidate_inode(self)
201             self._mtime = time.time()
202
203         self.fresh()
204
205     def in_use(self):
206         if super(Directory, self).in_use():
207             return True
208         for v in self._entries.values():
209             if v.in_use():
210                 return True
211         return False
212
213     def has_ref(self, only_children):
214         if super(Directory, self).has_ref(only_children):
215             return True
216         for v in self._entries.values():
217             if v.has_ref(False):
218                 return True
219         return False
220
221     def clear(self):
222         """Delete all entries"""
223         oldentries = self._entries
224         self._entries = {}
225         for n in oldentries:
226             oldentries[n].clear()
227             self.inodes.del_entry(oldentries[n])
228         self.invalidate()
229
230     def kernel_invalidate(self):
231         # Invalidating the dentry on the parent implies invalidating all paths
232         # below it as well.
233         parent = self.inodes[self.parent_inode]
234
235         # Find self on the parent in order to invalidate this path.
236         # Calling the public items() method might trigger a refresh,
237         # which we definitely don't want, so read the internal dict directly.
238         for k,v in parent._entries.items():
239             if v is self:
240                 self.inodes.invalidate_entry(parent, k)
241                 break
242
243     def mtime(self):
244         return self._mtime
245
246     def writable(self):
247         return False
248
249     def flush(self):
250         pass
251
252     def want_event_subscribe(self):
253         raise NotImplementedError()
254
255     def create(self, name):
256         raise NotImplementedError()
257
258     def mkdir(self, name):
259         raise NotImplementedError()
260
261     def unlink(self, name):
262         raise NotImplementedError()
263
264     def rmdir(self, name):
265         raise NotImplementedError()
266
267     def rename(self, name_old, name_new, src):
268         raise NotImplementedError()
269
270
271 class CollectionDirectoryBase(Directory):
272     """Represent an Arvados Collection as a directory.
273
274     This class is used for Subcollections, and is also the base class for
275     CollectionDirectory, which implements collection loading/saving on
276     Collection records.
277
278     Most operations act only the underlying Arvados `Collection` object.  The
279     `Collection` object signals via a notify callback to
280     `CollectionDirectoryBase.on_event` that an item was added, removed or
281     modified.  FUSE inodes and directory entries are created, deleted or
282     invalidated in response to these events.
283
284     """
285
286     def __init__(self, parent_inode, inodes, apiconfig, enable_write, filters, collection, collection_root):
287         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write, filters)
288         self.apiconfig = apiconfig
289         self.collection = collection
290         self.collection_root = collection_root
291         self.collection_record_file = None
292
293     def new_entry(self, name, item, mtime):
294         name = self.sanitize_filename(name)
295         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
296             if item.fuse_entry.dead is not True:
297                 raise Exception("Can only reparent dead inode entry")
298             if item.fuse_entry.inode is None:
299                 raise Exception("Reparented entry must still have valid inode")
300             item.fuse_entry.dead = False
301             self._entries[name] = item.fuse_entry
302         elif isinstance(item, arvados.collection.RichCollectionBase):
303             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
304                 self.inode,
305                 self.inodes,
306                 self.apiconfig,
307                 self._enable_write,
308                 self._filters,
309                 item,
310                 self.collection_root,
311             ))
312             self._entries[name].populate(mtime)
313         else:
314             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
315         item.fuse_entry = self._entries[name]
316
317     def on_event(self, event, collection, name, item):
318         # These are events from the Collection object (ADD/DEL/MOD)
319         # emitted by operations on the Collection object (like
320         # "mkdirs" or "remove"), and by "update", which we need to
321         # synchronize with our FUSE objects that are assigned inodes.
322         if collection == self.collection:
323             name = self.sanitize_filename(name)
324
325             #
326             # It's possible for another thread to have llfuse.lock and
327             # be waiting on collection.lock.  Meanwhile, we released
328             # llfuse.lock earlier in the stack, but are still holding
329             # on to the collection lock, and now we need to re-acquire
330             # llfuse.lock.  If we don't release the collection lock,
331             # we'll deadlock where we're holding the collection lock
332             # waiting for llfuse.lock and the other thread is holding
333             # llfuse.lock and waiting for the collection lock.
334             #
335             # The correct locking order here is to take llfuse.lock
336             # first, then the collection lock.
337             #
338             # Since collection.lock is an RLock, it might be locked
339             # multiple times, so we need to release it multiple times,
340             # keep a count, then re-lock it the correct number of
341             # times.
342             #
343             lockcount = 0
344             try:
345                 while True:
346                     self.collection.lock.release()
347                     lockcount += 1
348             except RuntimeError:
349                 pass
350
351             try:
352                 with llfuse.lock:
353                     with self.collection.lock:
354                         if event == arvados.collection.ADD:
355                             self.new_entry(name, item, self.mtime())
356                         elif event == arvados.collection.DEL:
357                             ent = self._entries[name]
358                             del self._entries[name]
359                             self.inodes.invalidate_entry(self, name)
360                             self.inodes.del_entry(ent)
361                         elif event == arvados.collection.MOD:
362                             if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
363                                 self.inodes.invalidate_inode(item.fuse_entry)
364                             elif name in self._entries:
365                                 self.inodes.invalidate_inode(self._entries[name])
366
367                         if self.collection_record_file is not None:
368                             self.collection_record_file.invalidate()
369                             self.inodes.invalidate_inode(self.collection_record_file)
370             finally:
371                 while lockcount > 0:
372                     self.collection.lock.acquire()
373                     lockcount -= 1
374
375     def populate(self, mtime):
376         self._mtime = mtime
377         with self.collection.lock:
378             self.collection.subscribe(self.on_event)
379             for entry, item in self.collection.items():
380                 self.new_entry(entry, item, self.mtime())
381
382     def writable(self):
383         return self._enable_write and self.collection.writable()
384
385     @use_counter
386     def flush(self):
387         self.collection_root.flush()
388
389     @use_counter
390     @check_update
391     def create(self, name):
392         if not self.writable():
393             raise llfuse.FUSEError(errno.EROFS)
394         with llfuse.lock_released:
395             self.collection.open(name, "w").close()
396
397     @use_counter
398     @check_update
399     def mkdir(self, name):
400         if not self.writable():
401             raise llfuse.FUSEError(errno.EROFS)
402         with llfuse.lock_released:
403             self.collection.mkdirs(name)
404
405     @use_counter
406     @check_update
407     def unlink(self, name):
408         if not self.writable():
409             raise llfuse.FUSEError(errno.EROFS)
410         with llfuse.lock_released:
411             self.collection.remove(name)
412         self.flush()
413
414     @use_counter
415     @check_update
416     def rmdir(self, name):
417         if not self.writable():
418             raise llfuse.FUSEError(errno.EROFS)
419         with llfuse.lock_released:
420             self.collection.remove(name)
421         self.flush()
422
423     @use_counter
424     @check_update
425     def rename(self, name_old, name_new, src):
426         if not self.writable():
427             raise llfuse.FUSEError(errno.EROFS)
428
429         if not isinstance(src, CollectionDirectoryBase):
430             raise llfuse.FUSEError(errno.EPERM)
431
432         if name_new in self:
433             ent = src[name_old]
434             tgt = self[name_new]
435             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
436                 pass
437             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
438                 if len(tgt) > 0:
439                     raise llfuse.FUSEError(errno.ENOTEMPTY)
440             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
441                 raise llfuse.FUSEError(errno.ENOTDIR)
442             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
443                 raise llfuse.FUSEError(errno.EISDIR)
444
445         with llfuse.lock_released:
446             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
447         self.flush()
448         src.flush()
449
450     def clear(self):
451         super(CollectionDirectoryBase, self).clear()
452         self.collection = None
453
454
455 class CollectionDirectory(CollectionDirectoryBase):
456     """Represents the root of a directory tree representing a collection."""
457
458     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
459         super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters, None, self)
460         self.api = api
461         self.num_retries = num_retries
462         self._poll = True
463         try:
464             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
465         except:
466             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
467             self._poll_time = 60*60
468
469         if isinstance(collection_record, dict):
470             self.collection_locator = collection_record['uuid']
471             self._mtime = convertTime(collection_record.get('modified_at'))
472         else:
473             self.collection_locator = collection_record
474             self._mtime = 0
475         self._manifest_size = 0
476         if self.collection_locator:
477             self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
478         self._updating_lock = threading.Lock()
479
480     def same(self, i):
481         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
482
483     def writable(self):
484         return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
485
486     @use_counter
487     def flush(self):
488         if not self.writable():
489             return
490         with llfuse.lock_released:
491             with self._updating_lock:
492                 if self.collection.committed():
493                     self.collection.update()
494                 else:
495                     self.collection.save()
496                 self.new_collection_record(self.collection.api_response())
497
498     def want_event_subscribe(self):
499         return (uuid_pattern.match(self.collection_locator) is not None)
500
501     def new_collection(self, new_collection_record, coll_reader):
502         if self.inode:
503             self.clear()
504         self.collection = coll_reader
505         self.new_collection_record(new_collection_record)
506         self.populate(self.mtime())
507
508     def new_collection_record(self, new_collection_record):
509         if not new_collection_record:
510             raise Exception("invalid new_collection_record")
511         self._mtime = convertTime(new_collection_record.get('modified_at'))
512         self._manifest_size = len(new_collection_record["manifest_text"])
513         self.collection_locator = new_collection_record["uuid"]
514         if self.collection_record_file is not None:
515             self.collection_record_file.invalidate()
516             self.inodes.invalidate_inode(self.collection_record_file)
517             _logger.debug("%s invalidated collection record file", self)
518         self.fresh()
519
520     def uuid(self):
521         return self.collection_locator
522
523     @use_counter
524     def update(self):
525         try:
526             if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator):
527                 # It's immutable, nothing to update
528                 return True
529
530             if self.collection_locator is None:
531                 # No collection locator to retrieve from
532                 self.fresh()
533                 return True
534
535             new_collection_record = None
536             try:
537                 with llfuse.lock_released:
538                     self._updating_lock.acquire()
539                     if not self.stale():
540                         return True
541
542                     _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
543                     coll_reader = None
544                     if self.collection is not None:
545                         # Already have a collection object
546                         self.collection.update()
547                         new_collection_record = self.collection.api_response()
548                     else:
549                         # Create a new collection object
550                         if uuid_pattern.match(self.collection_locator):
551                             coll_reader = arvados.collection.Collection(
552                                 self.collection_locator, self.api, self.api.keep,
553                                 num_retries=self.num_retries)
554                         else:
555                             coll_reader = arvados.collection.CollectionReader(
556                                 self.collection_locator, self.api, self.api.keep,
557                                 num_retries=self.num_retries)
558                         new_collection_record = coll_reader.api_response() or {}
559                         # If the Collection only exists in Keep, there will be no API
560                         # response.  Fill in the fields we need.
561                         if 'uuid' not in new_collection_record:
562                             new_collection_record['uuid'] = self.collection_locator
563                         if "portable_data_hash" not in new_collection_record:
564                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
565                         if 'manifest_text' not in new_collection_record:
566                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
567                         if 'storage_classes_desired' not in new_collection_record:
568                             new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
569
570                 # end with llfuse.lock_released, re-acquire lock
571
572                 if new_collection_record is not None:
573                     if coll_reader is not None:
574                         self.new_collection(new_collection_record, coll_reader)
575                     else:
576                         self.new_collection_record(new_collection_record)
577
578                 return True
579             finally:
580                 self._updating_lock.release()
581         except arvados.errors.NotFoundError as e:
582             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
583         except arvados.errors.ArgumentError as detail:
584             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
585             if new_collection_record is not None and "manifest_text" in new_collection_record:
586                 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
587         except Exception:
588             _logger.exception("arv-mount %s: error", self.collection_locator)
589             if new_collection_record is not None and "manifest_text" in new_collection_record:
590                 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
591         self.invalidate()
592         return False
593
594     @use_counter
595     def collection_record(self):
596         self.flush()
597         return self.collection.api_response()
598
599     @use_counter
600     @check_update
601     def __getitem__(self, item):
602         if item == '.arvados#collection':
603             if self.collection_record_file is None:
604                 self.collection_record_file = FuncToJSONFile(
605                     self.inode, self.collection_record)
606                 self.inodes.add_entry(self.collection_record_file)
607             self.invalidate()  # use lookup as a signal to force update
608             return self.collection_record_file
609         else:
610             return super(CollectionDirectory, self).__getitem__(item)
611
612     def __contains__(self, k):
613         if k == '.arvados#collection':
614             return True
615         else:
616             return super(CollectionDirectory, self).__contains__(k)
617
618     def invalidate(self):
619         if self.collection_record_file is not None:
620             self.collection_record_file.invalidate()
621             self.inodes.invalidate_inode(self.collection_record_file)
622         super(CollectionDirectory, self).invalidate()
623
624     def persisted(self):
625         return (self.collection_locator is not None)
626
627     def objsize(self):
628         # This is an empirically-derived heuristic to estimate the memory used
629         # to store this collection's metadata.  Calculating the memory
630         # footprint directly would be more accurate, but also more complicated.
631         return self._manifest_size * 128
632
633     def finalize(self):
634         if self.collection is not None:
635             if self.writable():
636                 self.collection.save()
637             self.collection.stop_threads()
638
639     def clear(self):
640         if self.collection is not None:
641             self.collection.stop_threads()
642         super(CollectionDirectory, self).clear()
643         self._manifest_size = 0
644
645
646 class TmpCollectionDirectory(CollectionDirectoryBase):
647     """A directory backed by an Arvados collection that never gets saved.
648
649     This supports using Keep as scratch space. A userspace program can
650     read the .arvados#collection file to get a current manifest in
651     order to save a snapshot of the scratch data or use it as a crunch
652     job output.
653     """
654
655     class UnsaveableCollection(arvados.collection.Collection):
656         def save(self):
657             pass
658         def save_new(self):
659             pass
660
661     def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None):
662         collection = self.UnsaveableCollection(
663             api_client=api_client,
664             keep_client=api_client.keep,
665             num_retries=num_retries,
666             storage_classes_desired=storage_classes)
667         # This is always enable_write=True because it never tries to
668         # save to the backend
669         super(TmpCollectionDirectory, self).__init__(
670             parent_inode, inodes, api_client.config, True, filters, collection, self)
671         self.populate(self.mtime())
672
673     def on_event(self, *args, **kwargs):
674         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
675         if self.collection_record_file is None:
676             return
677
678         # See discussion in CollectionDirectoryBase.on_event
679         lockcount = 0
680         try:
681             while True:
682                 self.collection.lock.release()
683                 lockcount += 1
684         except RuntimeError:
685             pass
686
687         try:
688             with llfuse.lock:
689                 with self.collection.lock:
690                     self.collection_record_file.invalidate()
691                     self.inodes.invalidate_inode(self.collection_record_file)
692                     _logger.debug("%s invalidated collection record", self)
693         finally:
694             while lockcount > 0:
695                 self.collection.lock.acquire()
696                 lockcount -= 1
697
698     def collection_record(self):
699         with llfuse.lock_released:
700             return {
701                 "uuid": None,
702                 "manifest_text": self.collection.manifest_text(),
703                 "portable_data_hash": self.collection.portable_data_hash(),
704                 "storage_classes_desired": self.collection.storage_classes_desired(),
705             }
706
707     def __contains__(self, k):
708         return (k == '.arvados#collection' or
709                 super(TmpCollectionDirectory, self).__contains__(k))
710
711     @use_counter
712     def __getitem__(self, item):
713         if item == '.arvados#collection':
714             if self.collection_record_file is None:
715                 self.collection_record_file = FuncToJSONFile(
716                     self.inode, self.collection_record)
717                 self.inodes.add_entry(self.collection_record_file)
718             return self.collection_record_file
719         return super(TmpCollectionDirectory, self).__getitem__(item)
720
721     def persisted(self):
722         return False
723
724     def writable(self):
725         return True
726
727     def flush(self):
728         pass
729
730     def want_event_subscribe(self):
731         return False
732
733     def finalize(self):
734         self.collection.stop_threads()
735
736     def invalidate(self):
737         if self.collection_record_file:
738             self.collection_record_file.invalidate()
739         super(TmpCollectionDirectory, self).invalidate()
740
741
742 class MagicDirectory(Directory):
743     """A special directory that logically contains the set of all extant keep locators.
744
745     When a file is referenced by lookup(), it is tested to see if it is a valid
746     keep locator to a manifest, and if so, loads the manifest contents as a
747     subdirectory of this directory with the locator as the directory name.
748     Since querying a list of all extant keep locators is impractical, only
749     collections that have already been accessed are visible to readdir().
750
751     """
752
753     README_TEXT = """
754 This directory provides access to Arvados collections as subdirectories listed
755 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
756 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
757 (in the form 'zzzzz-j7d0g-1234567890abcde').
758
759 Note that this directory will appear empty until you attempt to access a
760 specific collection or project subdirectory (such as trying to 'cd' into it),
761 at which point the collection or project will actually be looked up on the server
762 and the directory will appear if it exists.
763
764 """.lstrip()
765
766     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
767         super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
768         self.api = api
769         self.num_retries = num_retries
770         self.pdh_only = pdh_only
771         self.storage_classes = storage_classes
772
773     def __setattr__(self, name, value):
774         super(MagicDirectory, self).__setattr__(name, value)
775         # When we're assigned an inode, add a README.
776         if ((name == 'inode') and (self.inode is not None) and
777               (not self._entries)):
778             self._entries['README'] = self.inodes.add_entry(
779                 StringFile(self.inode, self.README_TEXT, time.time()))
780             # If we're the root directory, add an identical by_id subdirectory.
781             if self.inode == llfuse.ROOT_INODE:
782                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
783                     self.inode,
784                     self.inodes,
785                     self.api,
786                     self.num_retries,
787                     self._enable_write,
788                     self._filters,
789                     self.pdh_only,
790                 ))
791
792     def __contains__(self, k):
793         if k in self._entries:
794             return True
795
796         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
797             return False
798
799         try:
800             e = None
801
802             if group_uuid_pattern.match(k):
803                 project = self.api.groups().list(
804                     filters=[
805                         ['group_class', 'in', ['project','filter']],
806                         ["uuid", "=", k],
807                         *self._filters_for('groups', qualified=False),
808                     ],
809                 ).execute(num_retries=self.num_retries)
810                 if project[u'items_available'] == 0:
811                     return False
812                 e = self.inodes.add_entry(ProjectDirectory(
813                     self.inode,
814                     self.inodes,
815                     self.api,
816                     self.num_retries,
817                     self._enable_write,
818                     self._filters,
819                     project[u'items'][0],
820                     storage_classes=self.storage_classes,
821                 ))
822             else:
823                 e = self.inodes.add_entry(CollectionDirectory(
824                     self.inode,
825                     self.inodes,
826                     self.api,
827                     self.num_retries,
828                     self._enable_write,
829                     self._filters,
830                     k,
831                 ))
832
833             if e.update():
834                 if k not in self._entries:
835                     self._entries[k] = e
836                 else:
837                     self.inodes.del_entry(e)
838                 return True
839             else:
840                 self.inodes.invalidate_entry(self, k)
841                 self.inodes.del_entry(e)
842                 return False
843         except Exception as ex:
844             _logger.exception("arv-mount lookup '%s':", k)
845             if e is not None:
846                 self.inodes.del_entry(e)
847             return False
848
849     def __getitem__(self, item):
850         if item in self:
851             return self._entries[item]
852         else:
853             raise KeyError("No collection with id " + item)
854
855     def clear(self):
856         pass
857
858     def want_event_subscribe(self):
859         return not self.pdh_only
860
861
862 class TagsDirectory(Directory):
863     """A special directory that contains as subdirectories all tags visible to the user."""
864
865     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
866         super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
867         self.api = api
868         self.num_retries = num_retries
869         self._poll = True
870         self._poll_time = poll_time
871         self._extra = set()
872
873     def want_event_subscribe(self):
874         return True
875
876     @use_counter
877     def update(self):
878         with llfuse.lock_released:
879             tags = self.api.links().list(
880                 filters=[
881                     ['link_class', '=', 'tag'],
882                     ['name', '!=', ''],
883                     *self._filters_for('links', qualified=False),
884                 ],
885                 select=['name'],
886                 distinct=True,
887                 limit=1000,
888             ).execute(num_retries=self.num_retries)
889         if "items" in tags:
890             self.merge(
891                 tags['items']+[{"name": n} for n in self._extra],
892                 lambda i: i['name'],
893                 lambda a, i: a.tag == i['name'],
894                 lambda i: TagDirectory(
895                     self.inode,
896                     self.inodes,
897                     self.api,
898                     self.num_retries,
899                     self._enable_write,
900                     self._filters,
901                     i['name'],
902                     poll=self._poll,
903                     poll_time=self._poll_time,
904                 ),
905             )
906
907     @use_counter
908     @check_update
909     def __getitem__(self, item):
910         if super(TagsDirectory, self).__contains__(item):
911             return super(TagsDirectory, self).__getitem__(item)
912         with llfuse.lock_released:
913             tags = self.api.links().list(
914                 filters=[
915                     ['link_class', '=', 'tag'],
916                     ['name', '=', item],
917                     *self._filters_for('links', qualified=False),
918                 ],
919                 limit=1,
920             ).execute(num_retries=self.num_retries)
921         if tags["items"]:
922             self._extra.add(item)
923             self.update()
924         return super(TagsDirectory, self).__getitem__(item)
925
926     @use_counter
927     @check_update
928     def __contains__(self, k):
929         if super(TagsDirectory, self).__contains__(k):
930             return True
931         try:
932             self[k]
933             return True
934         except KeyError:
935             pass
936         return False
937
938
939 class TagDirectory(Directory):
940     """A special directory that contains as subdirectories all collections visible
941     to the user that are tagged with a particular tag.
942     """
943
944     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
945                  poll=False, poll_time=60):
946         super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
947         self.api = api
948         self.num_retries = num_retries
949         self.tag = tag
950         self._poll = poll
951         self._poll_time = poll_time
952
953     def want_event_subscribe(self):
954         return True
955
956     @use_counter
957     def update(self):
958         with llfuse.lock_released:
959             taggedcollections = self.api.links().list(
960                 filters=[
961                     ['link_class', '=', 'tag'],
962                     ['name', '=', self.tag],
963                     ['head_uuid', 'is_a', 'arvados#collection'],
964                     *self._filters_for('links', qualified=False),
965                 ],
966                 select=['head_uuid'],
967             ).execute(num_retries=self.num_retries)
968         self.merge(
969             taggedcollections['items'],
970             lambda i: i['head_uuid'],
971             lambda a, i: a.collection_locator == i['head_uuid'],
972             lambda i: CollectionDirectory(
973                 self.inode,
974                 self.inodes,
975                 self.api,
976                 self.num_retries,
977                 self._enable_write,
978                 self._filters,
979                 i['head_uuid'],
980             ),
981         )
982
983
984 class ProjectDirectory(Directory):
985     """A special directory that contains the contents of a project."""
986
987     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
988                  project_object, poll=True, poll_time=3, storage_classes=None):
989         super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
990         self.api = api
991         self.num_retries = num_retries
992         self.project_object = project_object
993         self.project_object_file = None
994         self.project_uuid = project_object['uuid']
995         self._poll = poll
996         self._poll_time = poll_time
997         self._updating_lock = threading.Lock()
998         self._current_user = None
999         self._full_listing = False
1000         self.storage_classes = storage_classes
1001
1002     def want_event_subscribe(self):
1003         return True
1004
1005     def createDirectory(self, i):
1006         common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters)
1007         if collection_uuid_pattern.match(i['uuid']):
1008             return CollectionDirectory(*common_args, i)
1009         elif group_uuid_pattern.match(i['uuid']):
1010             return ProjectDirectory(*common_args, i, self._poll, self._poll_time, self.storage_classes)
1011         elif link_uuid_pattern.match(i['uuid']):
1012             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
1013                 return CollectionDirectory(*common_args, i['head_uuid'])
1014             else:
1015                 return None
1016         elif uuid_pattern.match(i['uuid']):
1017             return ObjectFile(self.parent_inode, i)
1018         else:
1019             return None
1020
1021     def uuid(self):
1022         return self.project_uuid
1023
1024     def items(self):
1025         self._full_listing = True
1026         return super(ProjectDirectory, self).items()
1027
1028     def namefn(self, i):
1029         if 'name' in i:
1030             if i['name'] is None or len(i['name']) == 0:
1031                 return None
1032             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
1033                 # collection or subproject
1034                 return i['name']
1035             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
1036                 # name link
1037                 return i['name']
1038             elif 'kind' in i and i['kind'].startswith('arvados#'):
1039                 # something else
1040                 return "{}.{}".format(i['name'], i['kind'][8:])
1041         else:
1042             return None
1043
1044
1045     @use_counter
1046     def update(self):
1047         if self.project_object_file == None:
1048             self.project_object_file = ObjectFile(self.inode, self.project_object)
1049             self.inodes.add_entry(self.project_object_file)
1050
1051         if not self._full_listing:
1052             return True
1053
1054         def samefn(a, i):
1055             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
1056                 return a.uuid() == i['uuid']
1057             elif isinstance(a, ObjectFile):
1058                 return a.uuid() == i['uuid'] and not a.stale()
1059             return False
1060
1061         try:
1062             with llfuse.lock_released:
1063                 self._updating_lock.acquire()
1064                 if not self.stale():
1065                     return
1066
1067                 if group_uuid_pattern.match(self.project_uuid):
1068                     self.project_object = self.api.groups().get(
1069                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
1070                 elif user_uuid_pattern.match(self.project_uuid):
1071                     self.project_object = self.api.users().get(
1072                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
1073                 # do this in 2 steps until #17424 is fixed
1074                 contents = list(arvados.util.keyset_list_all(
1075                     self.api.groups().contents,
1076                     order_key='uuid',
1077                     num_retries=self.num_retries,
1078                     uuid=self.project_uuid,
1079                     filters=[
1080                         ['uuid', 'is_a', 'arvados#group'],
1081                         ['groups.group_class', 'in', ['project', 'filter']],
1082                         *self._filters_for('groups', qualified=True),
1083                     ],
1084                 ))
1085                 contents.extend(obj for obj in arvados.util.keyset_list_all(
1086                     self.api.groups().contents,
1087                     order_key='uuid',
1088                     num_retries=self.num_retries,
1089                     uuid=self.project_uuid,
1090                     filters=[
1091                         ['uuid', 'is_a', 'arvados#collection'],
1092                         *self._filters_for('collections', qualified=True),
1093                     ],
1094                 ) if obj['current_version_uuid'] == obj['uuid'])
1095
1096             # end with llfuse.lock_released, re-acquire lock
1097
1098             self.merge(contents,
1099                        self.namefn,
1100                        samefn,
1101                        self.createDirectory)
1102             return True
1103         finally:
1104             self._updating_lock.release()
1105
1106     def _add_entry(self, i, name):
1107         ent = self.createDirectory(i)
1108         self._entries[name] = self.inodes.add_entry(ent)
1109         return self._entries[name]
1110
1111     @use_counter
1112     @check_update
1113     def __getitem__(self, k):
1114         if k == '.arvados#project':
1115             return self.project_object_file
1116         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1117             return super(ProjectDirectory, self).__getitem__(k)
1118         with llfuse.lock_released:
1119             k2 = self.unsanitize_filename(k)
1120             if k2 == k:
1121                 namefilter = ["name", "=", k]
1122             else:
1123                 namefilter = ["name", "in", [k, k2]]
1124             contents = self.api.groups().list(
1125                 filters=[
1126                     ["owner_uuid", "=", self.project_uuid],
1127                     ["group_class", "in", ["project","filter"]],
1128                     namefilter,
1129                     *self._filters_for('groups', qualified=False),
1130                 ],
1131                 limit=2,
1132             ).execute(num_retries=self.num_retries)["items"]
1133             if not contents:
1134                 contents = self.api.collections().list(
1135                     filters=[
1136                         ["owner_uuid", "=", self.project_uuid],
1137                         namefilter,
1138                         *self._filters_for('collections', qualified=False),
1139                     ],
1140                     limit=2,
1141                 ).execute(num_retries=self.num_retries)["items"]
1142         if contents:
1143             if len(contents) > 1 and contents[1]['name'] == k:
1144                 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1145                 # "foo[SUBST]bar".
1146                 contents = [contents[1]]
1147             name = self.sanitize_filename(self.namefn(contents[0]))
1148             if name != k:
1149                 raise KeyError(k)
1150             return self._add_entry(contents[0], name)
1151
1152         # Didn't find item
1153         raise KeyError(k)
1154
1155     def __contains__(self, k):
1156         if k == '.arvados#project':
1157             return True
1158         try:
1159             self[k]
1160             return True
1161         except KeyError:
1162             pass
1163         return False
1164
1165     @use_counter
1166     @check_update
1167     def writable(self):
1168         if not self._enable_write:
1169             return False
1170         with llfuse.lock_released:
1171             if not self._current_user:
1172                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1173             return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1174
1175     def persisted(self):
1176         return True
1177
1178     @use_counter
1179     @check_update
1180     def mkdir(self, name):
1181         if not self.writable():
1182             raise llfuse.FUSEError(errno.EROFS)
1183
1184         try:
1185             with llfuse.lock_released:
1186                 c = {
1187                     "owner_uuid": self.project_uuid,
1188                     "name": name,
1189                     "manifest_text": "" }
1190                 if self.storage_classes is not None:
1191                     c["storage_classes_desired"] = self.storage_classes
1192                 try:
1193                     self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1194                 except Exception as e:
1195                     raise
1196             self.invalidate()
1197         except apiclient_errors.Error as error:
1198             _logger.error(error)
1199             raise llfuse.FUSEError(errno.EEXIST)
1200
1201     @use_counter
1202     @check_update
1203     def rmdir(self, name):
1204         if not self.writable():
1205             raise llfuse.FUSEError(errno.EROFS)
1206
1207         if name not in self:
1208             raise llfuse.FUSEError(errno.ENOENT)
1209         if not isinstance(self[name], CollectionDirectory):
1210             raise llfuse.FUSEError(errno.EPERM)
1211         if len(self[name]) > 0:
1212             raise llfuse.FUSEError(errno.ENOTEMPTY)
1213         with llfuse.lock_released:
1214             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1215         self.invalidate()
1216
1217     @use_counter
1218     @check_update
1219     def rename(self, name_old, name_new, src):
1220         if not self.writable():
1221             raise llfuse.FUSEError(errno.EROFS)
1222
1223         if not isinstance(src, ProjectDirectory):
1224             raise llfuse.FUSEError(errno.EPERM)
1225
1226         ent = src[name_old]
1227
1228         if not isinstance(ent, CollectionDirectory):
1229             raise llfuse.FUSEError(errno.EPERM)
1230
1231         if name_new in self:
1232             # POSIX semantics for replacing one directory with another is
1233             # tricky (the target directory must be empty, the operation must be
1234             # atomic which isn't possible with the Arvados API as of this
1235             # writing) so don't support that.
1236             raise llfuse.FUSEError(errno.EPERM)
1237
1238         self.api.collections().update(uuid=ent.uuid(),
1239                                       body={"owner_uuid": self.uuid(),
1240                                             "name": name_new}).execute(num_retries=self.num_retries)
1241
1242         # Acually move the entry from source directory to this directory.
1243         del src._entries[name_old]
1244         self._entries[name_new] = ent
1245         self.inodes.invalidate_entry(src, name_old)
1246
1247     @use_counter
1248     def child_event(self, ev):
1249         properties = ev.get("properties") or {}
1250         old_attrs = properties.get("old_attributes") or {}
1251         new_attrs = properties.get("new_attributes") or {}
1252         old_attrs["uuid"] = ev["object_uuid"]
1253         new_attrs["uuid"] = ev["object_uuid"]
1254         old_name = self.sanitize_filename(self.namefn(old_attrs))
1255         new_name = self.sanitize_filename(self.namefn(new_attrs))
1256
1257         # create events will have a new name, but not an old name
1258         # delete events will have an old name, but not a new name
1259         # update events will have an old and new name, and they may be same or different
1260         # if they are the same, an unrelated field changed and there is nothing to do.
1261
1262         if old_attrs.get("owner_uuid") != self.project_uuid:
1263             # Was moved from somewhere else, so don't try to remove entry.
1264             old_name = None
1265         if ev.get("object_owner_uuid") != self.project_uuid:
1266             # Was moved to somewhere else, so don't try to add entry
1267             new_name = None
1268
1269         if old_attrs.get("is_trashed"):
1270             # Was previously deleted
1271             old_name = None
1272         if new_attrs.get("is_trashed"):
1273             # Has been deleted
1274             new_name = None
1275
1276         if new_name != old_name:
1277             ent = None
1278             if old_name in self._entries:
1279                 ent = self._entries[old_name]
1280                 del self._entries[old_name]
1281                 self.inodes.invalidate_entry(self, old_name)
1282
1283             if new_name:
1284                 if ent is not None:
1285                     self._entries[new_name] = ent
1286                 else:
1287                     self._add_entry(new_attrs, new_name)
1288             elif ent is not None:
1289                 self.inodes.del_entry(ent)
1290
1291
1292 class SharedDirectory(Directory):
1293     """A special directory that represents users or groups who have shared projects with me."""
1294
1295     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1296                  exclude, poll=False, poll_time=60, storage_classes=None):
1297         super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
1298         self.api = api
1299         self.num_retries = num_retries
1300         self.current_user = api.users().current().execute(num_retries=num_retries)
1301         self._poll = True
1302         self._poll_time = poll_time
1303         self._updating_lock = threading.Lock()
1304         self.storage_classes = storage_classes
1305
1306     @use_counter
1307     def update(self):
1308         try:
1309             with llfuse.lock_released:
1310                 self._updating_lock.acquire()
1311                 if not self.stale():
1312                     return
1313
1314                 contents = {}
1315                 roots = []
1316                 root_owners = set()
1317                 objects = {}
1318
1319                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1320                 if 'httpMethod' in methods.get('shared', {}):
1321                     page = []
1322                     while True:
1323                         resp = self.api.groups().shared(
1324                             filters=[
1325                                 ['group_class', 'in', ['project','filter']],
1326                                 *page,
1327                                 *self._filters_for('groups', qualified=False),
1328                             ],
1329                             order="uuid",
1330                             limit=10000,
1331                             count="none",
1332                             include="owner_uuid",
1333                         ).execute()
1334                         if not resp["items"]:
1335                             break
1336                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1337                         for r in resp["items"]:
1338                             objects[r["uuid"]] = r
1339                             roots.append(r["uuid"])
1340                         for r in resp["included"]:
1341                             objects[r["uuid"]] = r
1342                             root_owners.add(r["uuid"])
1343                 else:
1344                     all_projects = list(arvados.util.keyset_list_all(
1345                         self.api.groups().list,
1346                         order_key="uuid",
1347                         num_retries=self.num_retries,
1348                         filters=[
1349                             ['group_class', 'in', ['project','filter']],
1350                             *self._filters_for('groups', qualified=False),
1351                         ],
1352                         select=["uuid", "owner_uuid"],
1353                     ))
1354                     for ob in all_projects:
1355                         objects[ob['uuid']] = ob
1356
1357                     current_uuid = self.current_user['uuid']
1358                     for ob in all_projects:
1359                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1360                             roots.append(ob['uuid'])
1361                             root_owners.add(ob['owner_uuid'])
1362
1363                     lusers = arvados.util.keyset_list_all(
1364                         self.api.users().list,
1365                         order_key="uuid",
1366                         num_retries=self.num_retries,
1367                         filters=[
1368                             ['uuid', 'in', list(root_owners)],
1369                             *self._filters_for('users', qualified=False),
1370                         ],
1371                     )
1372                     lgroups = arvados.util.keyset_list_all(
1373                         self.api.groups().list,
1374                         order_key="uuid",
1375                         num_retries=self.num_retries,
1376                         filters=[
1377                             ['uuid', 'in', list(root_owners)+roots],
1378                             *self._filters_for('groups', qualified=False),
1379                         ],
1380                     )
1381                     for l in lusers:
1382                         objects[l["uuid"]] = l
1383                     for l in lgroups:
1384                         objects[l["uuid"]] = l
1385
1386                 for r in root_owners:
1387                     if r in objects:
1388                         obr = objects[r]
1389                         if obr.get("name"):
1390                             contents[obr["name"]] = obr
1391                         elif "first_name" in obr:
1392                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1393
1394                 for r in roots:
1395                     if r in objects:
1396                         obr = objects[r]
1397                         if obr['owner_uuid'] not in objects:
1398                             contents[obr["name"]] = obr
1399
1400             # end with llfuse.lock_released, re-acquire lock
1401
1402             self.merge(
1403                 contents.items(),
1404                 lambda i: i[0],
1405                 lambda a, i: a.uuid() == i[1]['uuid'],
1406                 lambda i: ProjectDirectory(
1407                     self.inode,
1408                     self.inodes,
1409                     self.api,
1410                     self.num_retries,
1411                     self._enable_write,
1412                     self._filters,
1413                     i[1],
1414                     poll=self._poll,
1415                     poll_time=self._poll_time,
1416                     storage_classes=self.storage_classes,
1417                 ),
1418             )
1419         except Exception:
1420             _logger.exception("arv-mount shared dir error")
1421         finally:
1422             self._updating_lock.release()
1423
1424     def want_event_subscribe(self):
1425         return True