17995: Fix merge error.
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future.utils import viewitems
8 from future.utils import itervalues
9 from builtins import dict
10 import apiclient
11 import arvados
12 import errno
13 import functools
14 import llfuse
15 import logging
16 import re
17 import sys
18 import threading
19 import time
20 from apiclient import errors as apiclient_errors
21
22 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
23 from .fresh import FreshBase, convertTime, use_counter, check_update
24
25 import arvados.collection
26 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
27
28 _logger = logging.getLogger('arvados.arvados_fuse')
29
30
31 # Match any character which FUSE or Linux cannot accommodate as part
32 # of a filename. (If present in a collection filename, they will
33 # appear as underscores in the fuse mount.)
34 _disallowed_filename_characters = re.compile('[\x00/]')
35
36
37 class Directory(FreshBase):
38     """Generic directory object, backed by a dict.
39
40     Consists of a set of entries with the key representing the filename
41     and the value referencing a File or Directory object.
42     """
43
44     def __init__(self, parent_inode, inodes, apiconfig):
45         """parent_inode is the integer inode number"""
46
47         super(Directory, self).__init__()
48
49         self.inode = None
50         if not isinstance(parent_inode, int):
51             raise Exception("parent_inode should be an int")
52         self.parent_inode = parent_inode
53         self.inodes = inodes
54         self.apiconfig = apiconfig
55         self._entries = {}
56         self._mtime = time.time()
57
58     def forward_slash_subst(self):
59         if not hasattr(self, '_fsns'):
60             self._fsns = None
61             config = self.apiconfig()
62             try:
63                 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
64             except KeyError:
65                 # old API server with no FSNS config
66                 self._fsns = '_'
67             else:
68                 if self._fsns == '' or self._fsns == '/':
69                     self._fsns = None
70         return self._fsns
71
72     def unsanitize_filename(self, incoming):
73         """Replace ForwardSlashNameSubstitution value with /"""
74         fsns = self.forward_slash_subst()
75         if isinstance(fsns, str):
76             return incoming.replace(fsns, '/')
77         else:
78             return incoming
79
80     def sanitize_filename(self, dirty):
81         """Replace disallowed filename characters according to
82         ForwardSlashNameSubstitution in self.api_config."""
83         # '.' and '..' are not reachable if API server is newer than #6277
84         if dirty is None:
85             return None
86         elif dirty == '':
87             return '_'
88         elif dirty == '.':
89             return '_'
90         elif dirty == '..':
91             return '__'
92         else:
93             fsns = self.forward_slash_subst()
94             if isinstance(fsns, str):
95                 dirty = dirty.replace('/', fsns)
96             return _disallowed_filename_characters.sub('_', dirty)
97
98
99     #  Overridden by subclasses to implement logic to update the
100     #  entries dict when the directory is stale
101     @use_counter
102     def update(self):
103         pass
104
105     # Only used when computing the size of the disk footprint of the directory
106     # (stub)
107     def size(self):
108         return 0
109
110     def persisted(self):
111         return False
112
113     def checkupdate(self):
114         if self.stale():
115             try:
116                 self.update()
117             except apiclient.errors.HttpError as e:
118                 _logger.warn(e)
119
120     @use_counter
121     @check_update
122     def __getitem__(self, item):
123         return self._entries[item]
124
125     @use_counter
126     @check_update
127     def items(self):
128         return list(self._entries.items())
129
130     @use_counter
131     @check_update
132     def __contains__(self, k):
133         return k in self._entries
134
135     @use_counter
136     @check_update
137     def __len__(self):
138         return len(self._entries)
139
140     def fresh(self):
141         self.inodes.touch(self)
142         super(Directory, self).fresh()
143
144     def merge(self, items, fn, same, new_entry):
145         """Helper method for updating the contents of the directory.
146
147         Takes a list describing the new contents of the directory, reuse
148         entries that are the same in both the old and new lists, create new
149         entries, and delete old entries missing from the new list.
150
151         :items: iterable with new directory contents
152
153         :fn: function to take an entry in 'items' and return the desired file or
154         directory name, or None if this entry should be skipped
155
156         :same: function to compare an existing entry (a File or Directory
157         object) with an entry in the items list to determine whether to keep
158         the existing entry.
159
160         :new_entry: function to create a new directory entry (File or Directory
161         object) from an entry in the items list.
162
163         """
164
165         oldentries = self._entries
166         self._entries = {}
167         changed = False
168         for i in items:
169             name = self.sanitize_filename(fn(i))
170             if name:
171                 if name in oldentries and same(oldentries[name], i):
172                     # move existing directory entry over
173                     self._entries[name] = oldentries[name]
174                     del oldentries[name]
175                 else:
176                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
177                     # create new directory entry
178                     ent = new_entry(i)
179                     if ent is not None:
180                         self._entries[name] = self.inodes.add_entry(ent)
181                         changed = True
182
183         # delete any other directory entries that were not in found in 'items'
184         for i in oldentries:
185             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
186             self.inodes.invalidate_entry(self, i)
187             self.inodes.del_entry(oldentries[i])
188             changed = True
189
190         if changed:
191             self.inodes.invalidate_inode(self)
192             self._mtime = time.time()
193
194         self.fresh()
195
196     def in_use(self):
197         if super(Directory, self).in_use():
198             return True
199         for v in itervalues(self._entries):
200             if v.in_use():
201                 return True
202         return False
203
204     def has_ref(self, only_children):
205         if super(Directory, self).has_ref(only_children):
206             return True
207         for v in itervalues(self._entries):
208             if v.has_ref(False):
209                 return True
210         return False
211
212     def clear(self):
213         """Delete all entries"""
214         oldentries = self._entries
215         self._entries = {}
216         for n in oldentries:
217             oldentries[n].clear()
218             self.inodes.del_entry(oldentries[n])
219         self.invalidate()
220
221     def kernel_invalidate(self):
222         # Invalidating the dentry on the parent implies invalidating all paths
223         # below it as well.
224         parent = self.inodes[self.parent_inode]
225
226         # Find self on the parent in order to invalidate this path.
227         # Calling the public items() method might trigger a refresh,
228         # which we definitely don't want, so read the internal dict directly.
229         for k,v in viewitems(parent._entries):
230             if v is self:
231                 self.inodes.invalidate_entry(parent, k)
232                 break
233
234     def mtime(self):
235         return self._mtime
236
237     def writable(self):
238         return False
239
240     def flush(self):
241         pass
242
243     def want_event_subscribe(self):
244         raise NotImplementedError()
245
246     def create(self, name):
247         raise NotImplementedError()
248
249     def mkdir(self, name):
250         raise NotImplementedError()
251
252     def unlink(self, name):
253         raise NotImplementedError()
254
255     def rmdir(self, name):
256         raise NotImplementedError()
257
258     def rename(self, name_old, name_new, src):
259         raise NotImplementedError()
260
261
262 class CollectionDirectoryBase(Directory):
263     """Represent an Arvados Collection as a directory.
264
265     This class is used for Subcollections, and is also the base class for
266     CollectionDirectory, which implements collection loading/saving on
267     Collection records.
268
269     Most operations act only the underlying Arvados `Collection` object.  The
270     `Collection` object signals via a notify callback to
271     `CollectionDirectoryBase.on_event` that an item was added, removed or
272     modified.  FUSE inodes and directory entries are created, deleted or
273     invalidated in response to these events.
274
275     """
276
277     def __init__(self, parent_inode, inodes, apiconfig, collection):
278         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig)
279         self.apiconfig = apiconfig
280         self.collection = collection
281
282     def new_entry(self, name, item, mtime):
283         name = self.sanitize_filename(name)
284         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
285             if item.fuse_entry.dead is not True:
286                 raise Exception("Can only reparent dead inode entry")
287             if item.fuse_entry.inode is None:
288                 raise Exception("Reparented entry must still have valid inode")
289             item.fuse_entry.dead = False
290             self._entries[name] = item.fuse_entry
291         elif isinstance(item, arvados.collection.RichCollectionBase):
292             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, item))
293             self._entries[name].populate(mtime)
294         else:
295             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
296         item.fuse_entry = self._entries[name]
297
298     def on_event(self, event, collection, name, item):
299         if collection == self.collection:
300             name = self.sanitize_filename(name)
301
302             #
303             # It's possible for another thread to have llfuse.lock and
304             # be waiting on collection.lock.  Meanwhile, we released
305             # llfuse.lock earlier in the stack, but are still holding
306             # on to the collection lock, and now we need to re-acquire
307             # llfuse.lock.  If we don't release the collection lock,
308             # we'll deadlock where we're holding the collection lock
309             # waiting for llfuse.lock and the other thread is holding
310             # llfuse.lock and waiting for the collection lock.
311             #
312             # The correct locking order here is to take llfuse.lock
313             # first, then the collection lock.
314             #
315             # Since collection.lock is an RLock, it might be locked
316             # multiple times, so we need to release it multiple times,
317             # keep a count, then re-lock it the correct number of
318             # times.
319             #
320             lockcount = 0
321             try:
322                 while True:
323                     self.collection.lock.release()
324                     lockcount += 1
325             except RuntimeError:
326                 pass
327
328             try:
329                 with llfuse.lock:
330                     with self.collection.lock:
331                         if event == arvados.collection.ADD:
332                             self.new_entry(name, item, self.mtime())
333                         elif event == arvados.collection.DEL:
334                             ent = self._entries[name]
335                             del self._entries[name]
336                             self.inodes.invalidate_entry(self, name)
337                             self.inodes.del_entry(ent)
338                         elif event == arvados.collection.MOD:
339                             if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
340                                 self.inodes.invalidate_inode(item.fuse_entry)
341                             elif name in self._entries:
342                                 self.inodes.invalidate_inode(self._entries[name])
343             finally:
344                 while lockcount > 0:
345                     self.collection.lock.acquire()
346                     lockcount -= 1
347
348     def populate(self, mtime):
349         self._mtime = mtime
350         self.collection.subscribe(self.on_event)
351         for entry, item in viewitems(self.collection):
352             self.new_entry(entry, item, self.mtime())
353
354     def writable(self):
355         return self.collection.writable()
356
357     @use_counter
358     def flush(self):
359         with llfuse.lock_released:
360             self.collection.root_collection().save()
361
362     @use_counter
363     @check_update
364     def create(self, name):
365         with llfuse.lock_released:
366             self.collection.open(name, "w").close()
367
368     @use_counter
369     @check_update
370     def mkdir(self, name):
371         with llfuse.lock_released:
372             self.collection.mkdirs(name)
373
374     @use_counter
375     @check_update
376     def unlink(self, name):
377         with llfuse.lock_released:
378             self.collection.remove(name)
379         self.flush()
380
381     @use_counter
382     @check_update
383     def rmdir(self, name):
384         with llfuse.lock_released:
385             self.collection.remove(name)
386         self.flush()
387
388     @use_counter
389     @check_update
390     def rename(self, name_old, name_new, src):
391         if not isinstance(src, CollectionDirectoryBase):
392             raise llfuse.FUSEError(errno.EPERM)
393
394         if name_new in self:
395             ent = src[name_old]
396             tgt = self[name_new]
397             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
398                 pass
399             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
400                 if len(tgt) > 0:
401                     raise llfuse.FUSEError(errno.ENOTEMPTY)
402             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
403                 raise llfuse.FUSEError(errno.ENOTDIR)
404             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
405                 raise llfuse.FUSEError(errno.EISDIR)
406
407         with llfuse.lock_released:
408             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
409         self.flush()
410         src.flush()
411
412     def clear(self):
413         super(CollectionDirectoryBase, self).clear()
414         self.collection = None
415
416
417 class CollectionDirectory(CollectionDirectoryBase):
418     """Represents the root of a directory tree representing a collection."""
419
420     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
421         super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, None)
422         self.api = api
423         self.num_retries = num_retries
424         self.collection_record_file = None
425         self.collection_record = None
426         self._poll = True
427         try:
428             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
429         except:
430             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
431             self._poll_time = 60*60
432
433         if isinstance(collection_record, dict):
434             self.collection_locator = collection_record['uuid']
435             self._mtime = convertTime(collection_record.get('modified_at'))
436         else:
437             self.collection_locator = collection_record
438             self._mtime = 0
439         self._manifest_size = 0
440         if self.collection_locator:
441             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
442         self._updating_lock = threading.Lock()
443
444     def same(self, i):
445         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
446
447     def writable(self):
448         return self.collection.writable() if self.collection is not None else self._writable
449
450     def want_event_subscribe(self):
451         return (uuid_pattern.match(self.collection_locator) is not None)
452
453     # Used by arv-web.py to switch the contents of the CollectionDirectory
454     def change_collection(self, new_locator):
455         """Switch the contents of the CollectionDirectory.
456
457         Must be called with llfuse.lock held.
458         """
459
460         self.collection_locator = new_locator
461         self.collection_record = None
462         self.update()
463
464     def new_collection(self, new_collection_record, coll_reader):
465         if self.inode:
466             self.clear()
467
468         self.collection_record = new_collection_record
469
470         if self.collection_record:
471             self._mtime = convertTime(self.collection_record.get('modified_at'))
472             self.collection_locator = self.collection_record["uuid"]
473             if self.collection_record_file is not None:
474                 self.collection_record_file.update(self.collection_record)
475
476         self.collection = coll_reader
477         self.populate(self.mtime())
478
479     def uuid(self):
480         return self.collection_locator
481
482     @use_counter
483     def update(self, to_record_version=None):
484         try:
485             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
486                 return True
487
488             if self.collection_locator is None:
489                 self.fresh()
490                 return True
491
492             try:
493                 with llfuse.lock_released:
494                     self._updating_lock.acquire()
495                     if not self.stale():
496                         return
497
498                     _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
499                     if self.collection is not None:
500                         if self.collection.known_past_version(to_record_version):
501                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
502                         else:
503                             self.collection.update()
504                     else:
505                         if uuid_pattern.match(self.collection_locator):
506                             coll_reader = arvados.collection.Collection(
507                                 self.collection_locator, self.api, self.api.keep,
508                                 num_retries=self.num_retries)
509                         else:
510                             coll_reader = arvados.collection.CollectionReader(
511                                 self.collection_locator, self.api, self.api.keep,
512                                 num_retries=self.num_retries)
513                         new_collection_record = coll_reader.api_response() or {}
514                         # If the Collection only exists in Keep, there will be no API
515                         # response.  Fill in the fields we need.
516                         if 'uuid' not in new_collection_record:
517                             new_collection_record['uuid'] = self.collection_locator
518                         if "portable_data_hash" not in new_collection_record:
519                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
520                         if 'manifest_text' not in new_collection_record:
521                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
522                         if 'storage_classes_desired' not in new_collection_record:
523                             new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
524
525                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
526                             self.new_collection(new_collection_record, coll_reader)
527
528                         self._manifest_size = len(coll_reader.manifest_text())
529                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
530                 # end with llfuse.lock_released, re-acquire lock
531
532                 self.fresh()
533                 return True
534             finally:
535                 self._updating_lock.release()
536         except arvados.errors.NotFoundError as e:
537             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
538         except arvados.errors.ArgumentError as detail:
539             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
540             if self.collection_record is not None and "manifest_text" in self.collection_record:
541                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
542         except Exception:
543             _logger.exception("arv-mount %s: error", self.collection_locator)
544             if self.collection_record is not None and "manifest_text" in self.collection_record:
545                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
546         self.invalidate()
547         return False
548
549     @use_counter
550     @check_update
551     def __getitem__(self, item):
552         if item == '.arvados#collection':
553             if self.collection_record_file is None:
554                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
555                 self.inodes.add_entry(self.collection_record_file)
556             return self.collection_record_file
557         else:
558             return super(CollectionDirectory, self).__getitem__(item)
559
560     def __contains__(self, k):
561         if k == '.arvados#collection':
562             return True
563         else:
564             return super(CollectionDirectory, self).__contains__(k)
565
566     def invalidate(self):
567         self.collection_record = None
568         self.collection_record_file = None
569         super(CollectionDirectory, self).invalidate()
570
571     def persisted(self):
572         return (self.collection_locator is not None)
573
574     def objsize(self):
575         # This is an empirically-derived heuristic to estimate the memory used
576         # to store this collection's metadata.  Calculating the memory
577         # footprint directly would be more accurate, but also more complicated.
578         return self._manifest_size * 128
579
580     def finalize(self):
581         if self.collection is not None:
582             if self.writable():
583                 self.collection.save()
584             self.collection.stop_threads()
585
586     def clear(self):
587         if self.collection is not None:
588             self.collection.stop_threads()
589         super(CollectionDirectory, self).clear()
590         self._manifest_size = 0
591
592
593 class TmpCollectionDirectory(CollectionDirectoryBase):
594     """A directory backed by an Arvados collection that never gets saved.
595
596     This supports using Keep as scratch space. A userspace program can
597     read the .arvados#collection file to get a current manifest in
598     order to save a snapshot of the scratch data or use it as a crunch
599     job output.
600     """
601
602     class UnsaveableCollection(arvados.collection.Collection):
603         def save(self):
604             pass
605         def save_new(self):
606             pass
607
608     def __init__(self, parent_inode, inodes, api_client, num_retries, storage_classes=None):
609         collection = self.UnsaveableCollection(
610             api_client=api_client,
611             keep_client=api_client.keep,
612             num_retries=num_retries,
613             storage_classes_desired=storage_classes)
614         super(TmpCollectionDirectory, self).__init__(
615             parent_inode, inodes, api_client.config, collection)
616         self.collection_record_file = None
617         self.populate(self.mtime())
618
619     def on_event(self, *args, **kwargs):
620         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
621         if self.collection_record_file:
622
623             # See discussion in CollectionDirectoryBase.on_event
624             lockcount = 0
625             try:
626                 while True:
627                     self.collection.lock.release()
628                     lockcount += 1
629             except RuntimeError:
630                 pass
631
632             try:
633                 with llfuse.lock:
634                     with self.collection.lock:
635                         self.collection_record_file.invalidate()
636                         self.inodes.invalidate_inode(self.collection_record_file)
637                         _logger.debug("%s invalidated collection record", self)
638             finally:
639                 while lockcount > 0:
640                     self.collection.lock.acquire()
641                     lockcount -= 1
642
643     def collection_record(self):
644         with llfuse.lock_released:
645             return {
646                 "uuid": None,
647                 "manifest_text": self.collection.manifest_text(),
648                 "portable_data_hash": self.collection.portable_data_hash(),
649                 "storage_classes_desired": self.collection.storage_classes_desired(),
650             }
651
652     def __contains__(self, k):
653         return (k == '.arvados#collection' or
654                 super(TmpCollectionDirectory, self).__contains__(k))
655
656     @use_counter
657     def __getitem__(self, item):
658         if item == '.arvados#collection':
659             if self.collection_record_file is None:
660                 self.collection_record_file = FuncToJSONFile(
661                     self.inode, self.collection_record)
662                 self.inodes.add_entry(self.collection_record_file)
663             return self.collection_record_file
664         return super(TmpCollectionDirectory, self).__getitem__(item)
665
666     def persisted(self):
667         return False
668
669     def writable(self):
670         return True
671
672     def want_event_subscribe(self):
673         return False
674
675     def finalize(self):
676         self.collection.stop_threads()
677
678     def invalidate(self):
679         if self.collection_record_file:
680             self.collection_record_file.invalidate()
681         super(TmpCollectionDirectory, self).invalidate()
682
683
684 class MagicDirectory(Directory):
685     """A special directory that logically contains the set of all extant keep locators.
686
687     When a file is referenced by lookup(), it is tested to see if it is a valid
688     keep locator to a manifest, and if so, loads the manifest contents as a
689     subdirectory of this directory with the locator as the directory name.
690     Since querying a list of all extant keep locators is impractical, only
691     collections that have already been accessed are visible to readdir().
692
693     """
694
695     README_TEXT = """
696 This directory provides access to Arvados collections as subdirectories listed
697 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
698 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
699 (in the form 'zzzzz-j7d0g-1234567890abcde').
700
701 Note that this directory will appear empty until you attempt to access a
702 specific collection or project subdirectory (such as trying to 'cd' into it),
703 at which point the collection or project will actually be looked up on the server
704 and the directory will appear if it exists.
705
706 """.lstrip()
707
708     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False, storage_classes=None):
709         super(MagicDirectory, self).__init__(parent_inode, inodes, api.config)
710         self.api = api
711         self.num_retries = num_retries
712         self.pdh_only = pdh_only
713         self.storage_classes = storage_classes
714
715     def __setattr__(self, name, value):
716         super(MagicDirectory, self).__setattr__(name, value)
717         # When we're assigned an inode, add a README.
718         if ((name == 'inode') and (self.inode is not None) and
719               (not self._entries)):
720             self._entries['README'] = self.inodes.add_entry(
721                 StringFile(self.inode, self.README_TEXT, time.time()))
722             # If we're the root directory, add an identical by_id subdirectory.
723             if self.inode == llfuse.ROOT_INODE:
724                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
725                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
726
727     def __contains__(self, k):
728         if k in self._entries:
729             return True
730
731         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
732             return False
733
734         try:
735             e = None
736
737             if group_uuid_pattern.match(k):
738                 project = self.api.groups().list(
739                     filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
740                 if project[u'items_available'] == 0:
741                     return False
742                 e = self.inodes.add_entry(ProjectDirectory(
743                     self.inode, self.inodes, self.api, self.num_retries,
744                     project[u'items'][0], storage_classes=self.storage_classes))
745             else:
746                 e = self.inodes.add_entry(CollectionDirectory(
747                         self.inode, self.inodes, self.api, self.num_retries, k))
748
749             if e.update():
750                 if k not in self._entries:
751                     self._entries[k] = e
752                 else:
753                     self.inodes.del_entry(e)
754                 return True
755             else:
756                 self.inodes.invalidate_entry(self, k)
757                 self.inodes.del_entry(e)
758                 return False
759         except Exception as ex:
760             _logger.exception("arv-mount lookup '%s':", k)
761             if e is not None:
762                 self.inodes.del_entry(e)
763             return False
764
765     def __getitem__(self, item):
766         if item in self:
767             return self._entries[item]
768         else:
769             raise KeyError("No collection with id " + item)
770
771     def clear(self):
772         pass
773
774     def want_event_subscribe(self):
775         return not self.pdh_only
776
777
778 class TagsDirectory(Directory):
779     """A special directory that contains as subdirectories all tags visible to the user."""
780
781     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
782         super(TagsDirectory, self).__init__(parent_inode, inodes, api.config)
783         self.api = api
784         self.num_retries = num_retries
785         self._poll = True
786         self._poll_time = poll_time
787         self._extra = set()
788
789     def want_event_subscribe(self):
790         return True
791
792     @use_counter
793     def update(self):
794         with llfuse.lock_released:
795             tags = self.api.links().list(
796                 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
797                 select=['name'], distinct=True, limit=1000
798                 ).execute(num_retries=self.num_retries)
799         if "items" in tags:
800             self.merge(tags['items']+[{"name": n} for n in self._extra],
801                        lambda i: i['name'],
802                        lambda a, i: a.tag == i['name'],
803                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
804
805     @use_counter
806     @check_update
807     def __getitem__(self, item):
808         if super(TagsDirectory, self).__contains__(item):
809             return super(TagsDirectory, self).__getitem__(item)
810         with llfuse.lock_released:
811             tags = self.api.links().list(
812                 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
813             ).execute(num_retries=self.num_retries)
814         if tags["items"]:
815             self._extra.add(item)
816             self.update()
817         return super(TagsDirectory, self).__getitem__(item)
818
819     @use_counter
820     @check_update
821     def __contains__(self, k):
822         if super(TagsDirectory, self).__contains__(k):
823             return True
824         try:
825             self[k]
826             return True
827         except KeyError:
828             pass
829         return False
830
831
832 class TagDirectory(Directory):
833     """A special directory that contains as subdirectories all collections visible
834     to the user that are tagged with a particular tag.
835     """
836
837     def __init__(self, parent_inode, inodes, api, num_retries, tag,
838                  poll=False, poll_time=60):
839         super(TagDirectory, self).__init__(parent_inode, inodes, api.config)
840         self.api = api
841         self.num_retries = num_retries
842         self.tag = tag
843         self._poll = poll
844         self._poll_time = poll_time
845
846     def want_event_subscribe(self):
847         return True
848
849     @use_counter
850     def update(self):
851         with llfuse.lock_released:
852             taggedcollections = self.api.links().list(
853                 filters=[['link_class', '=', 'tag'],
854                          ['name', '=', self.tag],
855                          ['head_uuid', 'is_a', 'arvados#collection']],
856                 select=['head_uuid']
857                 ).execute(num_retries=self.num_retries)
858         self.merge(taggedcollections['items'],
859                    lambda i: i['head_uuid'],
860                    lambda a, i: a.collection_locator == i['head_uuid'],
861                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
862
863
864 class ProjectDirectory(Directory):
865     """A special directory that contains the contents of a project."""
866
867     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
868                  poll=True, poll_time=3, storage_classes=None):
869         super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config)
870         self.api = api
871         self.num_retries = num_retries
872         self.project_object = project_object
873         self.project_object_file = None
874         self.project_uuid = project_object['uuid']
875         self._poll = poll
876         self._poll_time = poll_time
877         self._updating_lock = threading.Lock()
878         self._current_user = None
879         self._full_listing = False
880         self.storage_classes = storage_classes
881
882     def want_event_subscribe(self):
883         return True
884
885     def createDirectory(self, i):
886         if collection_uuid_pattern.match(i['uuid']):
887             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
888         elif group_uuid_pattern.match(i['uuid']):
889             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time, self.storage_classes)
890         elif link_uuid_pattern.match(i['uuid']):
891             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
892                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
893             else:
894                 return None
895         elif uuid_pattern.match(i['uuid']):
896             return ObjectFile(self.parent_inode, i)
897         else:
898             return None
899
900     def uuid(self):
901         return self.project_uuid
902
903     def items(self):
904         self._full_listing = True
905         return super(ProjectDirectory, self).items()
906
907     def namefn(self, i):
908         if 'name' in i:
909             if i['name'] is None or len(i['name']) == 0:
910                 return None
911             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
912                 # collection or subproject
913                 return i['name']
914             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
915                 # name link
916                 return i['name']
917             elif 'kind' in i and i['kind'].startswith('arvados#'):
918                 # something else
919                 return "{}.{}".format(i['name'], i['kind'][8:])
920         else:
921             return None
922
923
924     @use_counter
925     def update(self):
926         if self.project_object_file == None:
927             self.project_object_file = ObjectFile(self.inode, self.project_object)
928             self.inodes.add_entry(self.project_object_file)
929
930         if not self._full_listing:
931             return True
932
933         def samefn(a, i):
934             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
935                 return a.uuid() == i['uuid']
936             elif isinstance(a, ObjectFile):
937                 return a.uuid() == i['uuid'] and not a.stale()
938             return False
939
940         try:
941             with llfuse.lock_released:
942                 self._updating_lock.acquire()
943                 if not self.stale():
944                     return
945
946                 if group_uuid_pattern.match(self.project_uuid):
947                     self.project_object = self.api.groups().get(
948                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
949                 elif user_uuid_pattern.match(self.project_uuid):
950                     self.project_object = self.api.users().get(
951                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
952                 # do this in 2 steps until #17424 is fixed
953                 contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
954                                                         order_key="uuid",
955                                                         num_retries=self.num_retries,
956                                                         uuid=self.project_uuid,
957                                                         filters=[["uuid", "is_a", "arvados#group"],
958                                                                  ["groups.group_class", "in", ["project","filter"]]]))
959                 contents.extend(arvados.util.keyset_list_all(self.api.groups().contents,
960                                                              order_key="uuid",
961                                                              num_retries=self.num_retries,
962                                                              uuid=self.project_uuid,
963                                                              filters=[["uuid", "is_a", "arvados#collection"]]))
964
965             # end with llfuse.lock_released, re-acquire lock
966
967             self.merge(contents,
968                        self.namefn,
969                        samefn,
970                        self.createDirectory)
971             return True
972         finally:
973             self._updating_lock.release()
974
975     def _add_entry(self, i, name):
976         ent = self.createDirectory(i)
977         self._entries[name] = self.inodes.add_entry(ent)
978         return self._entries[name]
979
980     @use_counter
981     @check_update
982     def __getitem__(self, k):
983         if k == '.arvados#project':
984             return self.project_object_file
985         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
986             return super(ProjectDirectory, self).__getitem__(k)
987         with llfuse.lock_released:
988             k2 = self.unsanitize_filename(k)
989             if k2 == k:
990                 namefilter = ["name", "=", k]
991             else:
992                 namefilter = ["name", "in", [k, k2]]
993             contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
994                                                        ["group_class", "in", ["project","filter"]],
995                                                        namefilter],
996                                               limit=2).execute(num_retries=self.num_retries)["items"]
997             if not contents:
998                 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
999                                                                 namefilter],
1000                                                        limit=2).execute(num_retries=self.num_retries)["items"]
1001         if contents:
1002             if len(contents) > 1 and contents[1]['name'] == k:
1003                 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1004                 # "foo[SUBST]bar".
1005                 contents = [contents[1]]
1006             name = self.sanitize_filename(self.namefn(contents[0]))
1007             if name != k:
1008                 raise KeyError(k)
1009             return self._add_entry(contents[0], name)
1010
1011         # Didn't find item
1012         raise KeyError(k)
1013
1014     def __contains__(self, k):
1015         if k == '.arvados#project':
1016             return True
1017         try:
1018             self[k]
1019             return True
1020         except KeyError:
1021             pass
1022         return False
1023
1024     @use_counter
1025     @check_update
1026     def writable(self):
1027         with llfuse.lock_released:
1028             if not self._current_user:
1029                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1030             return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1031
1032     def persisted(self):
1033         return True
1034
1035     @use_counter
1036     @check_update
1037     def mkdir(self, name):
1038         try:
1039             with llfuse.lock_released:
1040                 c = {
1041                     "owner_uuid": self.project_uuid,
1042                     "name": name,
1043                     "manifest_text": "" }
1044                 if self.storage_classes is not None:
1045                     c["storage_classes_desired"] = self.storage_classes
1046                 try:
1047                     self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1048                 except Exception as e:
1049                     raise
1050             self.invalidate()
1051         except apiclient_errors.Error as error:
1052             _logger.error(error)
1053             raise llfuse.FUSEError(errno.EEXIST)
1054
1055     @use_counter
1056     @check_update
1057     def rmdir(self, name):
1058         if name not in self:
1059             raise llfuse.FUSEError(errno.ENOENT)
1060         if not isinstance(self[name], CollectionDirectory):
1061             raise llfuse.FUSEError(errno.EPERM)
1062         if len(self[name]) > 0:
1063             raise llfuse.FUSEError(errno.ENOTEMPTY)
1064         with llfuse.lock_released:
1065             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1066         self.invalidate()
1067
1068     @use_counter
1069     @check_update
1070     def rename(self, name_old, name_new, src):
1071         if not isinstance(src, ProjectDirectory):
1072             raise llfuse.FUSEError(errno.EPERM)
1073
1074         ent = src[name_old]
1075
1076         if not isinstance(ent, CollectionDirectory):
1077             raise llfuse.FUSEError(errno.EPERM)
1078
1079         if name_new in self:
1080             # POSIX semantics for replacing one directory with another is
1081             # tricky (the target directory must be empty, the operation must be
1082             # atomic which isn't possible with the Arvados API as of this
1083             # writing) so don't support that.
1084             raise llfuse.FUSEError(errno.EPERM)
1085
1086         self.api.collections().update(uuid=ent.uuid(),
1087                                       body={"owner_uuid": self.uuid(),
1088                                             "name": name_new}).execute(num_retries=self.num_retries)
1089
1090         # Acually move the entry from source directory to this directory.
1091         del src._entries[name_old]
1092         self._entries[name_new] = ent
1093         self.inodes.invalidate_entry(src, name_old)
1094
1095     @use_counter
1096     def child_event(self, ev):
1097         properties = ev.get("properties") or {}
1098         old_attrs = properties.get("old_attributes") or {}
1099         new_attrs = properties.get("new_attributes") or {}
1100         old_attrs["uuid"] = ev["object_uuid"]
1101         new_attrs["uuid"] = ev["object_uuid"]
1102         old_name = self.sanitize_filename(self.namefn(old_attrs))
1103         new_name = self.sanitize_filename(self.namefn(new_attrs))
1104
1105         # create events will have a new name, but not an old name
1106         # delete events will have an old name, but not a new name
1107         # update events will have an old and new name, and they may be same or different
1108         # if they are the same, an unrelated field changed and there is nothing to do.
1109
1110         if old_attrs.get("owner_uuid") != self.project_uuid:
1111             # Was moved from somewhere else, so don't try to remove entry.
1112             old_name = None
1113         if ev.get("object_owner_uuid") != self.project_uuid:
1114             # Was moved to somewhere else, so don't try to add entry
1115             new_name = None
1116
1117         if old_attrs.get("is_trashed"):
1118             # Was previously deleted
1119             old_name = None
1120         if new_attrs.get("is_trashed"):
1121             # Has been deleted
1122             new_name = None
1123
1124         if new_name != old_name:
1125             ent = None
1126             if old_name in self._entries:
1127                 ent = self._entries[old_name]
1128                 del self._entries[old_name]
1129                 self.inodes.invalidate_entry(self, old_name)
1130
1131             if new_name:
1132                 if ent is not None:
1133                     self._entries[new_name] = ent
1134                 else:
1135                     self._add_entry(new_attrs, new_name)
1136             elif ent is not None:
1137                 self.inodes.del_entry(ent)
1138
1139
1140 class SharedDirectory(Directory):
1141     """A special directory that represents users or groups who have shared projects with me."""
1142
1143     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
1144                  poll=False, poll_time=60, storage_classes=None):
1145         super(SharedDirectory, self).__init__(parent_inode, inodes, api.config)
1146         self.api = api
1147         self.num_retries = num_retries
1148         self.current_user = api.users().current().execute(num_retries=num_retries)
1149         self._poll = True
1150         self._poll_time = poll_time
1151         self._updating_lock = threading.Lock()
1152         self.storage_classes = storage_classes
1153
1154     @use_counter
1155     def update(self):
1156         try:
1157             with llfuse.lock_released:
1158                 self._updating_lock.acquire()
1159                 if not self.stale():
1160                     return
1161
1162                 contents = {}
1163                 roots = []
1164                 root_owners = set()
1165                 objects = {}
1166
1167                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1168                 if 'httpMethod' in methods.get('shared', {}):
1169                     page = []
1170                     while True:
1171                         resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
1172                                                         order="uuid",
1173                                                         limit=10000,
1174                                                         count="none",
1175                                                         include="owner_uuid").execute()
1176                         if not resp["items"]:
1177                             break
1178                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1179                         for r in resp["items"]:
1180                             objects[r["uuid"]] = r
1181                             roots.append(r["uuid"])
1182                         for r in resp["included"]:
1183                             objects[r["uuid"]] = r
1184                             root_owners.add(r["uuid"])
1185                 else:
1186                     all_projects = list(arvados.util.keyset_list_all(
1187                         self.api.groups().list,
1188                         order_key="uuid",
1189                         num_retries=self.num_retries,
1190                         filters=[['group_class','in',['project','filter']]],
1191                         select=["uuid", "owner_uuid"]))
1192                     for ob in all_projects:
1193                         objects[ob['uuid']] = ob
1194
1195                     current_uuid = self.current_user['uuid']
1196                     for ob in all_projects:
1197                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1198                             roots.append(ob['uuid'])
1199                             root_owners.add(ob['owner_uuid'])
1200
1201                     lusers = arvados.util.keyset_list_all(
1202                         self.api.users().list,
1203                         order_key="uuid",
1204                         num_retries=self.num_retries,
1205                         filters=[['uuid','in', list(root_owners)]])
1206                     lgroups = arvados.util.keyset_list_all(
1207                         self.api.groups().list,
1208                         order_key="uuid",
1209                         num_retries=self.num_retries,
1210                         filters=[['uuid','in', list(root_owners)+roots]])
1211
1212                     for l in lusers:
1213                         objects[l["uuid"]] = l
1214                     for l in lgroups:
1215                         objects[l["uuid"]] = l
1216
1217                 for r in root_owners:
1218                     if r in objects:
1219                         obr = objects[r]
1220                         if obr.get("name"):
1221                             contents[obr["name"]] = obr
1222                         elif "first_name" in obr:
1223                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1224
1225                 for r in roots:
1226                     if r in objects:
1227                         obr = objects[r]
1228                         if obr['owner_uuid'] not in objects:
1229                             contents[obr["name"]] = obr
1230
1231             # end with llfuse.lock_released, re-acquire lock
1232
1233             self.merge(viewitems(contents),
1234                        lambda i: i[0],
1235                        lambda a, i: a.uuid() == i[1]['uuid'],
1236                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
1237         except Exception:
1238             _logger.exception("arv-mount shared dir error")
1239         finally:
1240             self._updating_lock.release()
1241
1242     def want_event_subscribe(self):
1243         return True