Merge branch '17995-filter-by-comparing-attrs'
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import apiclient
6 import arvados
7 import errno
8 import functools
9 import llfuse
10 import logging
11 import re
12 import sys
13 import threading
14 import time
15 from apiclient import errors as apiclient_errors
16
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
19
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
22
23 _logger = logging.getLogger('arvados.arvados_fuse')
24
25
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile('[\x00/]')
30
31
32 class Directory(FreshBase):
33     """Generic directory object, backed by a dict.
34
35     Consists of a set of entries with the key representing the filename
36     and the value referencing a File or Directory object.
37     """
38
39     def __init__(self, parent_inode, inodes, apiconfig):
40         """parent_inode is the integer inode number"""
41
42         super(Directory, self).__init__()
43
44         self.inode = None
45         if not isinstance(parent_inode, int):
46             raise Exception("parent_inode should be an int")
47         self.parent_inode = parent_inode
48         self.inodes = inodes
49         self.apiconfig = apiconfig
50         self._entries = {}
51         self._mtime = time.time()
52
53     def forward_slash_subst(self):
54         if not hasattr(self, '_fsns'):
55             self._fsns = None
56             config = self.apiconfig()
57             try:
58                 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
59             except KeyError:
60                 # old API server with no FSNS config
61                 self._fsns = '_'
62             else:
63                 if self._fsns == '' or self._fsns == '/':
64                     self._fsns = None
65         return self._fsns
66
67     def unsanitize_filename(self, incoming):
68         """Replace ForwardSlashNameSubstitution value with /"""
69         fsns = self.forward_slash_subst()
70         if isinstance(fsns, str):
71             return incoming.replace(fsns, '/')
72         else:
73             return incoming
74
75     def sanitize_filename(self, dirty):
76         """Replace disallowed filename characters according to
77         ForwardSlashNameSubstitution in self.api_config."""
78         # '.' and '..' are not reachable if API server is newer than #6277
79         if dirty is None:
80             return None
81         elif dirty == '':
82             return '_'
83         elif dirty == '.':
84             return '_'
85         elif dirty == '..':
86             return '__'
87         else:
88             fsns = self.forward_slash_subst()
89             if isinstance(fsns, str):
90                 dirty = dirty.replace('/', fsns)
91             return _disallowed_filename_characters.sub('_', dirty)
92
93
94     #  Overridden by subclasses to implement logic to update the
95     #  entries dict when the directory is stale
96     @use_counter
97     def update(self):
98         pass
99
100     # Only used when computing the size of the disk footprint of the directory
101     # (stub)
102     def size(self):
103         return 0
104
105     def persisted(self):
106         return False
107
108     def checkupdate(self):
109         if self.stale():
110             try:
111                 self.update()
112             except apiclient.errors.HttpError as e:
113                 _logger.warn(e)
114
115     @use_counter
116     @check_update
117     def __getitem__(self, item):
118         return self._entries[item]
119
120     @use_counter
121     @check_update
122     def items(self):
123         return list(self._entries.items())
124
125     @use_counter
126     @check_update
127     def __contains__(self, k):
128         return k in self._entries
129
130     @use_counter
131     @check_update
132     def __len__(self):
133         return len(self._entries)
134
135     def fresh(self):
136         self.inodes.touch(self)
137         super(Directory, self).fresh()
138
139     def merge(self, items, fn, same, new_entry):
140         """Helper method for updating the contents of the directory.
141
142         Takes a list describing the new contents of the directory, reuse
143         entries that are the same in both the old and new lists, create new
144         entries, and delete old entries missing from the new list.
145
146         :items: iterable with new directory contents
147
148         :fn: function to take an entry in 'items' and return the desired file or
149         directory name, or None if this entry should be skipped
150
151         :same: function to compare an existing entry (a File or Directory
152         object) with an entry in the items list to determine whether to keep
153         the existing entry.
154
155         :new_entry: function to create a new directory entry (File or Directory
156         object) from an entry in the items list.
157
158         """
159
160         oldentries = self._entries
161         self._entries = {}
162         changed = False
163         for i in items:
164             name = self.sanitize_filename(fn(i))
165             if name:
166                 if name in oldentries and same(oldentries[name], i):
167                     # move existing directory entry over
168                     self._entries[name] = oldentries[name]
169                     del oldentries[name]
170                 else:
171                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
172                     # create new directory entry
173                     ent = new_entry(i)
174                     if ent is not None:
175                         self._entries[name] = self.inodes.add_entry(ent)
176                         changed = True
177
178         # delete any other directory entries that were not in found in 'items'
179         for i in oldentries:
180             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
181             self.inodes.invalidate_entry(self, i)
182             self.inodes.del_entry(oldentries[i])
183             changed = True
184
185         if changed:
186             self.inodes.invalidate_inode(self)
187             self._mtime = time.time()
188
189         self.fresh()
190
191     def in_use(self):
192         if super(Directory, self).in_use():
193             return True
194         for v in self._entries.values():
195             if v.in_use():
196                 return True
197         return False
198
199     def has_ref(self, only_children):
200         if super(Directory, self).has_ref(only_children):
201             return True
202         for v in self._entries.values():
203             if v.has_ref(False):
204                 return True
205         return False
206
207     def clear(self):
208         """Delete all entries"""
209         oldentries = self._entries
210         self._entries = {}
211         for n in oldentries:
212             oldentries[n].clear()
213             self.inodes.del_entry(oldentries[n])
214         self.invalidate()
215
216     def kernel_invalidate(self):
217         # Invalidating the dentry on the parent implies invalidating all paths
218         # below it as well.
219         parent = self.inodes[self.parent_inode]
220
221         # Find self on the parent in order to invalidate this path.
222         # Calling the public items() method might trigger a refresh,
223         # which we definitely don't want, so read the internal dict directly.
224         for k,v in parent._entries.items():
225             if v is self:
226                 self.inodes.invalidate_entry(parent, k)
227                 break
228
229     def mtime(self):
230         return self._mtime
231
232     def writable(self):
233         return False
234
235     def flush(self):
236         pass
237
238     def want_event_subscribe(self):
239         raise NotImplementedError()
240
241     def create(self, name):
242         raise NotImplementedError()
243
244     def mkdir(self, name):
245         raise NotImplementedError()
246
247     def unlink(self, name):
248         raise NotImplementedError()
249
250     def rmdir(self, name):
251         raise NotImplementedError()
252
253     def rename(self, name_old, name_new, src):
254         raise NotImplementedError()
255
256
257 class CollectionDirectoryBase(Directory):
258     """Represent an Arvados Collection as a directory.
259
260     This class is used for Subcollections, and is also the base class for
261     CollectionDirectory, which implements collection loading/saving on
262     Collection records.
263
264     Most operations act only the underlying Arvados `Collection` object.  The
265     `Collection` object signals via a notify callback to
266     `CollectionDirectoryBase.on_event` that an item was added, removed or
267     modified.  FUSE inodes and directory entries are created, deleted or
268     invalidated in response to these events.
269
270     """
271
272     def __init__(self, parent_inode, inodes, apiconfig, collection):
273         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig)
274         self.apiconfig = apiconfig
275         self.collection = collection
276
277     def new_entry(self, name, item, mtime):
278         name = self.sanitize_filename(name)
279         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
280             if item.fuse_entry.dead is not True:
281                 raise Exception("Can only reparent dead inode entry")
282             if item.fuse_entry.inode is None:
283                 raise Exception("Reparented entry must still have valid inode")
284             item.fuse_entry.dead = False
285             self._entries[name] = item.fuse_entry
286         elif isinstance(item, arvados.collection.RichCollectionBase):
287             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, item))
288             self._entries[name].populate(mtime)
289         else:
290             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
291         item.fuse_entry = self._entries[name]
292
293     def on_event(self, event, collection, name, item):
294         if collection == self.collection:
295             name = self.sanitize_filename(name)
296
297             #
298             # It's possible for another thread to have llfuse.lock and
299             # be waiting on collection.lock.  Meanwhile, we released
300             # llfuse.lock earlier in the stack, but are still holding
301             # on to the collection lock, and now we need to re-acquire
302             # llfuse.lock.  If we don't release the collection lock,
303             # we'll deadlock where we're holding the collection lock
304             # waiting for llfuse.lock and the other thread is holding
305             # llfuse.lock and waiting for the collection lock.
306             #
307             # The correct locking order here is to take llfuse.lock
308             # first, then the collection lock.
309             #
310             # Since collection.lock is an RLock, it might be locked
311             # multiple times, so we need to release it multiple times,
312             # keep a count, then re-lock it the correct number of
313             # times.
314             #
315             lockcount = 0
316             try:
317                 while True:
318                     self.collection.lock.release()
319                     lockcount += 1
320             except RuntimeError:
321                 pass
322
323             try:
324                 with llfuse.lock:
325                     with self.collection.lock:
326                         if event == arvados.collection.ADD:
327                             self.new_entry(name, item, self.mtime())
328                         elif event == arvados.collection.DEL:
329                             ent = self._entries[name]
330                             del self._entries[name]
331                             self.inodes.invalidate_entry(self, name)
332                             self.inodes.del_entry(ent)
333                         elif event == arvados.collection.MOD:
334                             if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
335                                 self.inodes.invalidate_inode(item.fuse_entry)
336                             elif name in self._entries:
337                                 self.inodes.invalidate_inode(self._entries[name])
338             finally:
339                 while lockcount > 0:
340                     self.collection.lock.acquire()
341                     lockcount -= 1
342
343     def populate(self, mtime):
344         self._mtime = mtime
345         with self.collection.lock:
346             self.collection.subscribe(self.on_event)
347             for entry, item in self.collection.items():
348                 self.new_entry(entry, item, self.mtime())
349
350     def writable(self):
351         return self.collection.writable()
352
353     @use_counter
354     def flush(self):
355         with llfuse.lock_released:
356             self.collection.root_collection().save()
357
358     @use_counter
359     @check_update
360     def create(self, name):
361         with llfuse.lock_released:
362             self.collection.open(name, "w").close()
363
364     @use_counter
365     @check_update
366     def mkdir(self, name):
367         with llfuse.lock_released:
368             self.collection.mkdirs(name)
369
370     @use_counter
371     @check_update
372     def unlink(self, name):
373         with llfuse.lock_released:
374             self.collection.remove(name)
375         self.flush()
376
377     @use_counter
378     @check_update
379     def rmdir(self, name):
380         with llfuse.lock_released:
381             self.collection.remove(name)
382         self.flush()
383
384     @use_counter
385     @check_update
386     def rename(self, name_old, name_new, src):
387         if not isinstance(src, CollectionDirectoryBase):
388             raise llfuse.FUSEError(errno.EPERM)
389
390         if name_new in self:
391             ent = src[name_old]
392             tgt = self[name_new]
393             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
394                 pass
395             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
396                 if len(tgt) > 0:
397                     raise llfuse.FUSEError(errno.ENOTEMPTY)
398             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
399                 raise llfuse.FUSEError(errno.ENOTDIR)
400             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
401                 raise llfuse.FUSEError(errno.EISDIR)
402
403         with llfuse.lock_released:
404             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
405         self.flush()
406         src.flush()
407
408     def clear(self):
409         super(CollectionDirectoryBase, self).clear()
410         self.collection = None
411
412
413 class CollectionDirectory(CollectionDirectoryBase):
414     """Represents the root of a directory tree representing a collection."""
415
416     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
417         super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, None)
418         self.api = api
419         self.num_retries = num_retries
420         self.collection_record_file = None
421         self.collection_record = None
422         self._poll = True
423         try:
424             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
425         except:
426             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
427             self._poll_time = 60*60
428
429         if isinstance(collection_record, dict):
430             self.collection_locator = collection_record['uuid']
431             self._mtime = convertTime(collection_record.get('modified_at'))
432         else:
433             self.collection_locator = collection_record
434             self._mtime = 0
435         self._manifest_size = 0
436         if self.collection_locator:
437             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
438         self._updating_lock = threading.Lock()
439
440     def same(self, i):
441         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
442
443     def writable(self):
444         return self.collection.writable() if self.collection is not None else self._writable
445
446     def want_event_subscribe(self):
447         return (uuid_pattern.match(self.collection_locator) is not None)
448
449     # Used by arv-web.py to switch the contents of the CollectionDirectory
450     def change_collection(self, new_locator):
451         """Switch the contents of the CollectionDirectory.
452
453         Must be called with llfuse.lock held.
454         """
455
456         self.collection_locator = new_locator
457         self.collection_record = None
458         self.update()
459
460     def new_collection(self, new_collection_record, coll_reader):
461         if self.inode:
462             self.clear()
463
464         self.collection_record = new_collection_record
465
466         if self.collection_record:
467             self._mtime = convertTime(self.collection_record.get('modified_at'))
468             self.collection_locator = self.collection_record["uuid"]
469             if self.collection_record_file is not None:
470                 self.collection_record_file.update(self.collection_record)
471
472         self.collection = coll_reader
473         self.populate(self.mtime())
474
475     def uuid(self):
476         return self.collection_locator
477
478     @use_counter
479     def update(self, to_record_version=None):
480         try:
481             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
482                 return True
483
484             if self.collection_locator is None:
485                 self.fresh()
486                 return True
487
488             try:
489                 with llfuse.lock_released:
490                     self._updating_lock.acquire()
491                     if not self.stale():
492                         return
493
494                     _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
495                     new_collection_record = None
496                     if self.collection is not None:
497                         if self.collection.known_past_version(to_record_version):
498                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
499                         else:
500                             self.collection.update()
501                     else:
502                         if uuid_pattern.match(self.collection_locator):
503                             coll_reader = arvados.collection.Collection(
504                                 self.collection_locator, self.api, self.api.keep,
505                                 num_retries=self.num_retries)
506                         else:
507                             coll_reader = arvados.collection.CollectionReader(
508                                 self.collection_locator, self.api, self.api.keep,
509                                 num_retries=self.num_retries)
510                         new_collection_record = coll_reader.api_response() or {}
511                         # If the Collection only exists in Keep, there will be no API
512                         # response.  Fill in the fields we need.
513                         if 'uuid' not in new_collection_record:
514                             new_collection_record['uuid'] = self.collection_locator
515                         if "portable_data_hash" not in new_collection_record:
516                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
517                         if 'manifest_text' not in new_collection_record:
518                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
519                         if 'storage_classes_desired' not in new_collection_record:
520                             new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
521
522                 # end with llfuse.lock_released, re-acquire lock
523                 if (new_collection_record is not None and
524                     (self.collection_record is None or
525                      self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"))):
526                     self.new_collection(new_collection_record, coll_reader)
527                     self._manifest_size = len(coll_reader.manifest_text())
528                     _logger.debug("%s manifest_size %i", self, self._manifest_size)
529
530                 self.fresh()
531                 return True
532             finally:
533                 self._updating_lock.release()
534         except arvados.errors.NotFoundError as e:
535             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
536         except arvados.errors.ArgumentError as detail:
537             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
538             if self.collection_record is not None and "manifest_text" in self.collection_record:
539                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
540         except Exception:
541             _logger.exception("arv-mount %s: error", self.collection_locator)
542             if self.collection_record is not None and "manifest_text" in self.collection_record:
543                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
544         self.invalidate()
545         return False
546
547     @use_counter
548     @check_update
549     def __getitem__(self, item):
550         if item == '.arvados#collection':
551             if self.collection_record_file is None:
552                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
553                 self.inodes.add_entry(self.collection_record_file)
554             return self.collection_record_file
555         else:
556             return super(CollectionDirectory, self).__getitem__(item)
557
558     def __contains__(self, k):
559         if k == '.arvados#collection':
560             return True
561         else:
562             return super(CollectionDirectory, self).__contains__(k)
563
564     def invalidate(self):
565         self.collection_record = None
566         self.collection_record_file = None
567         super(CollectionDirectory, self).invalidate()
568
569     def persisted(self):
570         return (self.collection_locator is not None)
571
572     def objsize(self):
573         # This is an empirically-derived heuristic to estimate the memory used
574         # to store this collection's metadata.  Calculating the memory
575         # footprint directly would be more accurate, but also more complicated.
576         return self._manifest_size * 128
577
578     def finalize(self):
579         if self.collection is not None:
580             if self.writable():
581                 self.collection.save()
582             self.collection.stop_threads()
583
584     def clear(self):
585         if self.collection is not None:
586             self.collection.stop_threads()
587         super(CollectionDirectory, self).clear()
588         self._manifest_size = 0
589
590
591 class TmpCollectionDirectory(CollectionDirectoryBase):
592     """A directory backed by an Arvados collection that never gets saved.
593
594     This supports using Keep as scratch space. A userspace program can
595     read the .arvados#collection file to get a current manifest in
596     order to save a snapshot of the scratch data or use it as a crunch
597     job output.
598     """
599
600     class UnsaveableCollection(arvados.collection.Collection):
601         def save(self):
602             pass
603         def save_new(self):
604             pass
605
606     def __init__(self, parent_inode, inodes, api_client, num_retries, storage_classes=None):
607         collection = self.UnsaveableCollection(
608             api_client=api_client,
609             keep_client=api_client.keep,
610             num_retries=num_retries,
611             storage_classes_desired=storage_classes)
612         super(TmpCollectionDirectory, self).__init__(
613             parent_inode, inodes, api_client.config, collection)
614         self.collection_record_file = None
615         self.populate(self.mtime())
616
617     def on_event(self, *args, **kwargs):
618         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
619         if self.collection_record_file:
620
621             # See discussion in CollectionDirectoryBase.on_event
622             lockcount = 0
623             try:
624                 while True:
625                     self.collection.lock.release()
626                     lockcount += 1
627             except RuntimeError:
628                 pass
629
630             try:
631                 with llfuse.lock:
632                     with self.collection.lock:
633                         self.collection_record_file.invalidate()
634                         self.inodes.invalidate_inode(self.collection_record_file)
635                         _logger.debug("%s invalidated collection record", self)
636             finally:
637                 while lockcount > 0:
638                     self.collection.lock.acquire()
639                     lockcount -= 1
640
641     def collection_record(self):
642         with llfuse.lock_released:
643             return {
644                 "uuid": None,
645                 "manifest_text": self.collection.manifest_text(),
646                 "portable_data_hash": self.collection.portable_data_hash(),
647                 "storage_classes_desired": self.collection.storage_classes_desired(),
648             }
649
650     def __contains__(self, k):
651         return (k == '.arvados#collection' or
652                 super(TmpCollectionDirectory, self).__contains__(k))
653
654     @use_counter
655     def __getitem__(self, item):
656         if item == '.arvados#collection':
657             if self.collection_record_file is None:
658                 self.collection_record_file = FuncToJSONFile(
659                     self.inode, self.collection_record)
660                 self.inodes.add_entry(self.collection_record_file)
661             return self.collection_record_file
662         return super(TmpCollectionDirectory, self).__getitem__(item)
663
664     def persisted(self):
665         return False
666
667     def writable(self):
668         return True
669
670     def want_event_subscribe(self):
671         return False
672
673     def finalize(self):
674         self.collection.stop_threads()
675
676     def invalidate(self):
677         if self.collection_record_file:
678             self.collection_record_file.invalidate()
679         super(TmpCollectionDirectory, self).invalidate()
680
681
682 class MagicDirectory(Directory):
683     """A special directory that logically contains the set of all extant keep locators.
684
685     When a file is referenced by lookup(), it is tested to see if it is a valid
686     keep locator to a manifest, and if so, loads the manifest contents as a
687     subdirectory of this directory with the locator as the directory name.
688     Since querying a list of all extant keep locators is impractical, only
689     collections that have already been accessed are visible to readdir().
690
691     """
692
693     README_TEXT = """
694 This directory provides access to Arvados collections as subdirectories listed
695 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
696 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
697 (in the form 'zzzzz-j7d0g-1234567890abcde').
698
699 Note that this directory will appear empty until you attempt to access a
700 specific collection or project subdirectory (such as trying to 'cd' into it),
701 at which point the collection or project will actually be looked up on the server
702 and the directory will appear if it exists.
703
704 """.lstrip()
705
706     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False, storage_classes=None):
707         super(MagicDirectory, self).__init__(parent_inode, inodes, api.config)
708         self.api = api
709         self.num_retries = num_retries
710         self.pdh_only = pdh_only
711         self.storage_classes = storage_classes
712
713     def __setattr__(self, name, value):
714         super(MagicDirectory, self).__setattr__(name, value)
715         # When we're assigned an inode, add a README.
716         if ((name == 'inode') and (self.inode is not None) and
717               (not self._entries)):
718             self._entries['README'] = self.inodes.add_entry(
719                 StringFile(self.inode, self.README_TEXT, time.time()))
720             # If we're the root directory, add an identical by_id subdirectory.
721             if self.inode == llfuse.ROOT_INODE:
722                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
723                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
724
725     def __contains__(self, k):
726         if k in self._entries:
727             return True
728
729         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
730             return False
731
732         try:
733             e = None
734
735             if group_uuid_pattern.match(k):
736                 project = self.api.groups().list(
737                     filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
738                 if project[u'items_available'] == 0:
739                     return False
740                 e = self.inodes.add_entry(ProjectDirectory(
741                     self.inode, self.inodes, self.api, self.num_retries,
742                     project[u'items'][0], storage_classes=self.storage_classes))
743             else:
744                 e = self.inodes.add_entry(CollectionDirectory(
745                         self.inode, self.inodes, self.api, self.num_retries, k))
746
747             if e.update():
748                 if k not in self._entries:
749                     self._entries[k] = e
750                 else:
751                     self.inodes.del_entry(e)
752                 return True
753             else:
754                 self.inodes.invalidate_entry(self, k)
755                 self.inodes.del_entry(e)
756                 return False
757         except Exception as ex:
758             _logger.exception("arv-mount lookup '%s':", k)
759             if e is not None:
760                 self.inodes.del_entry(e)
761             return False
762
763     def __getitem__(self, item):
764         if item in self:
765             return self._entries[item]
766         else:
767             raise KeyError("No collection with id " + item)
768
769     def clear(self):
770         pass
771
772     def want_event_subscribe(self):
773         return not self.pdh_only
774
775
776 class TagsDirectory(Directory):
777     """A special directory that contains as subdirectories all tags visible to the user."""
778
779     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
780         super(TagsDirectory, self).__init__(parent_inode, inodes, api.config)
781         self.api = api
782         self.num_retries = num_retries
783         self._poll = True
784         self._poll_time = poll_time
785         self._extra = set()
786
787     def want_event_subscribe(self):
788         return True
789
790     @use_counter
791     def update(self):
792         with llfuse.lock_released:
793             tags = self.api.links().list(
794                 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
795                 select=['name'], distinct=True, limit=1000
796                 ).execute(num_retries=self.num_retries)
797         if "items" in tags:
798             self.merge(tags['items']+[{"name": n} for n in self._extra],
799                        lambda i: i['name'],
800                        lambda a, i: a.tag == i['name'],
801                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
802
803     @use_counter
804     @check_update
805     def __getitem__(self, item):
806         if super(TagsDirectory, self).__contains__(item):
807             return super(TagsDirectory, self).__getitem__(item)
808         with llfuse.lock_released:
809             tags = self.api.links().list(
810                 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
811             ).execute(num_retries=self.num_retries)
812         if tags["items"]:
813             self._extra.add(item)
814             self.update()
815         return super(TagsDirectory, self).__getitem__(item)
816
817     @use_counter
818     @check_update
819     def __contains__(self, k):
820         if super(TagsDirectory, self).__contains__(k):
821             return True
822         try:
823             self[k]
824             return True
825         except KeyError:
826             pass
827         return False
828
829
830 class TagDirectory(Directory):
831     """A special directory that contains as subdirectories all collections visible
832     to the user that are tagged with a particular tag.
833     """
834
835     def __init__(self, parent_inode, inodes, api, num_retries, tag,
836                  poll=False, poll_time=60):
837         super(TagDirectory, self).__init__(parent_inode, inodes, api.config)
838         self.api = api
839         self.num_retries = num_retries
840         self.tag = tag
841         self._poll = poll
842         self._poll_time = poll_time
843
844     def want_event_subscribe(self):
845         return True
846
847     @use_counter
848     def update(self):
849         with llfuse.lock_released:
850             taggedcollections = self.api.links().list(
851                 filters=[['link_class', '=', 'tag'],
852                          ['name', '=', self.tag],
853                          ['head_uuid', 'is_a', 'arvados#collection']],
854                 select=['head_uuid']
855                 ).execute(num_retries=self.num_retries)
856         self.merge(taggedcollections['items'],
857                    lambda i: i['head_uuid'],
858                    lambda a, i: a.collection_locator == i['head_uuid'],
859                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
860
861
862 class ProjectDirectory(Directory):
863     """A special directory that contains the contents of a project."""
864
865     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
866                  poll=True, poll_time=3, storage_classes=None):
867         super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config)
868         self.api = api
869         self.num_retries = num_retries
870         self.project_object = project_object
871         self.project_object_file = None
872         self.project_uuid = project_object['uuid']
873         self._poll = poll
874         self._poll_time = poll_time
875         self._updating_lock = threading.Lock()
876         self._current_user = None
877         self._full_listing = False
878         self.storage_classes = storage_classes
879
880     def want_event_subscribe(self):
881         return True
882
883     def createDirectory(self, i):
884         if collection_uuid_pattern.match(i['uuid']):
885             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
886         elif group_uuid_pattern.match(i['uuid']):
887             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time, self.storage_classes)
888         elif link_uuid_pattern.match(i['uuid']):
889             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
890                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
891             else:
892                 return None
893         elif uuid_pattern.match(i['uuid']):
894             return ObjectFile(self.parent_inode, i)
895         else:
896             return None
897
898     def uuid(self):
899         return self.project_uuid
900
901     def items(self):
902         self._full_listing = True
903         return super(ProjectDirectory, self).items()
904
905     def namefn(self, i):
906         if 'name' in i:
907             if i['name'] is None or len(i['name']) == 0:
908                 return None
909             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
910                 # collection or subproject
911                 return i['name']
912             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
913                 # name link
914                 return i['name']
915             elif 'kind' in i and i['kind'].startswith('arvados#'):
916                 # something else
917                 return "{}.{}".format(i['name'], i['kind'][8:])
918         else:
919             return None
920
921
922     @use_counter
923     def update(self):
924         if self.project_object_file == None:
925             self.project_object_file = ObjectFile(self.inode, self.project_object)
926             self.inodes.add_entry(self.project_object_file)
927
928         if not self._full_listing:
929             return True
930
931         def samefn(a, i):
932             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
933                 return a.uuid() == i['uuid']
934             elif isinstance(a, ObjectFile):
935                 return a.uuid() == i['uuid'] and not a.stale()
936             return False
937
938         try:
939             with llfuse.lock_released:
940                 self._updating_lock.acquire()
941                 if not self.stale():
942                     return
943
944                 if group_uuid_pattern.match(self.project_uuid):
945                     self.project_object = self.api.groups().get(
946                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
947                 elif user_uuid_pattern.match(self.project_uuid):
948                     self.project_object = self.api.users().get(
949                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
950                 # do this in 2 steps until #17424 is fixed
951                 contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
952                                                         order_key="uuid",
953                                                         num_retries=self.num_retries,
954                                                         uuid=self.project_uuid,
955                                                         filters=[["uuid", "is_a", "arvados#group"],
956                                                                  ["groups.group_class", "in", ["project","filter"]]]))
957                 contents.extend(arvados.util.keyset_list_all(self.api.groups().contents,
958                                                              order_key="uuid",
959                                                              num_retries=self.num_retries,
960                                                              uuid=self.project_uuid,
961                                                              filters=[["uuid", "is_a", "arvados#collection"]]))
962
963             # end with llfuse.lock_released, re-acquire lock
964
965             self.merge(contents,
966                        self.namefn,
967                        samefn,
968                        self.createDirectory)
969             return True
970         finally:
971             self._updating_lock.release()
972
973     def _add_entry(self, i, name):
974         ent = self.createDirectory(i)
975         self._entries[name] = self.inodes.add_entry(ent)
976         return self._entries[name]
977
978     @use_counter
979     @check_update
980     def __getitem__(self, k):
981         if k == '.arvados#project':
982             return self.project_object_file
983         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
984             return super(ProjectDirectory, self).__getitem__(k)
985         with llfuse.lock_released:
986             k2 = self.unsanitize_filename(k)
987             if k2 == k:
988                 namefilter = ["name", "=", k]
989             else:
990                 namefilter = ["name", "in", [k, k2]]
991             contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
992                                                        ["group_class", "in", ["project","filter"]],
993                                                        namefilter],
994                                               limit=2).execute(num_retries=self.num_retries)["items"]
995             if not contents:
996                 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
997                                                                 namefilter],
998                                                        limit=2).execute(num_retries=self.num_retries)["items"]
999         if contents:
1000             if len(contents) > 1 and contents[1]['name'] == k:
1001                 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1002                 # "foo[SUBST]bar".
1003                 contents = [contents[1]]
1004             name = self.sanitize_filename(self.namefn(contents[0]))
1005             if name != k:
1006                 raise KeyError(k)
1007             return self._add_entry(contents[0], name)
1008
1009         # Didn't find item
1010         raise KeyError(k)
1011
1012     def __contains__(self, k):
1013         if k == '.arvados#project':
1014             return True
1015         try:
1016             self[k]
1017             return True
1018         except KeyError:
1019             pass
1020         return False
1021
1022     @use_counter
1023     @check_update
1024     def writable(self):
1025         with llfuse.lock_released:
1026             if not self._current_user:
1027                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1028             return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1029
1030     def persisted(self):
1031         return True
1032
1033     @use_counter
1034     @check_update
1035     def mkdir(self, name):
1036         try:
1037             with llfuse.lock_released:
1038                 c = {
1039                     "owner_uuid": self.project_uuid,
1040                     "name": name,
1041                     "manifest_text": "" }
1042                 if self.storage_classes is not None:
1043                     c["storage_classes_desired"] = self.storage_classes
1044                 try:
1045                     self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1046                 except Exception as e:
1047                     raise
1048             self.invalidate()
1049         except apiclient_errors.Error as error:
1050             _logger.error(error)
1051             raise llfuse.FUSEError(errno.EEXIST)
1052
1053     @use_counter
1054     @check_update
1055     def rmdir(self, name):
1056         if name not in self:
1057             raise llfuse.FUSEError(errno.ENOENT)
1058         if not isinstance(self[name], CollectionDirectory):
1059             raise llfuse.FUSEError(errno.EPERM)
1060         if len(self[name]) > 0:
1061             raise llfuse.FUSEError(errno.ENOTEMPTY)
1062         with llfuse.lock_released:
1063             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1064         self.invalidate()
1065
1066     @use_counter
1067     @check_update
1068     def rename(self, name_old, name_new, src):
1069         if not isinstance(src, ProjectDirectory):
1070             raise llfuse.FUSEError(errno.EPERM)
1071
1072         ent = src[name_old]
1073
1074         if not isinstance(ent, CollectionDirectory):
1075             raise llfuse.FUSEError(errno.EPERM)
1076
1077         if name_new in self:
1078             # POSIX semantics for replacing one directory with another is
1079             # tricky (the target directory must be empty, the operation must be
1080             # atomic which isn't possible with the Arvados API as of this
1081             # writing) so don't support that.
1082             raise llfuse.FUSEError(errno.EPERM)
1083
1084         self.api.collections().update(uuid=ent.uuid(),
1085                                       body={"owner_uuid": self.uuid(),
1086                                             "name": name_new}).execute(num_retries=self.num_retries)
1087
1088         # Acually move the entry from source directory to this directory.
1089         del src._entries[name_old]
1090         self._entries[name_new] = ent
1091         self.inodes.invalidate_entry(src, name_old)
1092
1093     @use_counter
1094     def child_event(self, ev):
1095         properties = ev.get("properties") or {}
1096         old_attrs = properties.get("old_attributes") or {}
1097         new_attrs = properties.get("new_attributes") or {}
1098         old_attrs["uuid"] = ev["object_uuid"]
1099         new_attrs["uuid"] = ev["object_uuid"]
1100         old_name = self.sanitize_filename(self.namefn(old_attrs))
1101         new_name = self.sanitize_filename(self.namefn(new_attrs))
1102
1103         # create events will have a new name, but not an old name
1104         # delete events will have an old name, but not a new name
1105         # update events will have an old and new name, and they may be same or different
1106         # if they are the same, an unrelated field changed and there is nothing to do.
1107
1108         if old_attrs.get("owner_uuid") != self.project_uuid:
1109             # Was moved from somewhere else, so don't try to remove entry.
1110             old_name = None
1111         if ev.get("object_owner_uuid") != self.project_uuid:
1112             # Was moved to somewhere else, so don't try to add entry
1113             new_name = None
1114
1115         if old_attrs.get("is_trashed"):
1116             # Was previously deleted
1117             old_name = None
1118         if new_attrs.get("is_trashed"):
1119             # Has been deleted
1120             new_name = None
1121
1122         if new_name != old_name:
1123             ent = None
1124             if old_name in self._entries:
1125                 ent = self._entries[old_name]
1126                 del self._entries[old_name]
1127                 self.inodes.invalidate_entry(self, old_name)
1128
1129             if new_name:
1130                 if ent is not None:
1131                     self._entries[new_name] = ent
1132                 else:
1133                     self._add_entry(new_attrs, new_name)
1134             elif ent is not None:
1135                 self.inodes.del_entry(ent)
1136
1137
1138 class SharedDirectory(Directory):
1139     """A special directory that represents users or groups who have shared projects with me."""
1140
1141     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
1142                  poll=False, poll_time=60, storage_classes=None):
1143         super(SharedDirectory, self).__init__(parent_inode, inodes, api.config)
1144         self.api = api
1145         self.num_retries = num_retries
1146         self.current_user = api.users().current().execute(num_retries=num_retries)
1147         self._poll = True
1148         self._poll_time = poll_time
1149         self._updating_lock = threading.Lock()
1150         self.storage_classes = storage_classes
1151
1152     @use_counter
1153     def update(self):
1154         try:
1155             with llfuse.lock_released:
1156                 self._updating_lock.acquire()
1157                 if not self.stale():
1158                     return
1159
1160                 contents = {}
1161                 roots = []
1162                 root_owners = set()
1163                 objects = {}
1164
1165                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1166                 if 'httpMethod' in methods.get('shared', {}):
1167                     page = []
1168                     while True:
1169                         resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
1170                                                         order="uuid",
1171                                                         limit=10000,
1172                                                         count="none",
1173                                                         include="owner_uuid").execute()
1174                         if not resp["items"]:
1175                             break
1176                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1177                         for r in resp["items"]:
1178                             objects[r["uuid"]] = r
1179                             roots.append(r["uuid"])
1180                         for r in resp["included"]:
1181                             objects[r["uuid"]] = r
1182                             root_owners.add(r["uuid"])
1183                 else:
1184                     all_projects = list(arvados.util.keyset_list_all(
1185                         self.api.groups().list,
1186                         order_key="uuid",
1187                         num_retries=self.num_retries,
1188                         filters=[['group_class','in',['project','filter']]],
1189                         select=["uuid", "owner_uuid"]))
1190                     for ob in all_projects:
1191                         objects[ob['uuid']] = ob
1192
1193                     current_uuid = self.current_user['uuid']
1194                     for ob in all_projects:
1195                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1196                             roots.append(ob['uuid'])
1197                             root_owners.add(ob['owner_uuid'])
1198
1199                     lusers = arvados.util.keyset_list_all(
1200                         self.api.users().list,
1201                         order_key="uuid",
1202                         num_retries=self.num_retries,
1203                         filters=[['uuid','in', list(root_owners)]])
1204                     lgroups = arvados.util.keyset_list_all(
1205                         self.api.groups().list,
1206                         order_key="uuid",
1207                         num_retries=self.num_retries,
1208                         filters=[['uuid','in', list(root_owners)+roots]])
1209
1210                     for l in lusers:
1211                         objects[l["uuid"]] = l
1212                     for l in lgroups:
1213                         objects[l["uuid"]] = l
1214
1215                 for r in root_owners:
1216                     if r in objects:
1217                         obr = objects[r]
1218                         if obr.get("name"):
1219                             contents[obr["name"]] = obr
1220                         elif "first_name" in obr:
1221                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1222
1223                 for r in roots:
1224                     if r in objects:
1225                         obr = objects[r]
1226                         if obr['owner_uuid'] not in objects:
1227                             contents[obr["name"]] = obr
1228
1229             # end with llfuse.lock_released, re-acquire lock
1230
1231             self.merge(contents.items(),
1232                        lambda i: i[0],
1233                        lambda a, i: a.uuid() == i[1]['uuid'],
1234                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
1235         except Exception:
1236             _logger.exception("arv-mount shared dir error")
1237         finally:
1238             self._updating_lock.release()
1239
1240     def want_event_subscribe(self):
1241         return True