17778: Merge branch 'master' into 17778-doc-update
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future.utils import viewitems
8 from future.utils import itervalues
9 from builtins import dict
10 import apiclient
11 import arvados
12 import errno
13 import functools
14 import llfuse
15 import logging
16 import re
17 import sys
18 import threading
19 import time
20 from apiclient import errors as apiclient_errors
21
22 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
23 from .fresh import FreshBase, convertTime, use_counter, check_update
24
25 import arvados.collection
26 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
27
28 _logger = logging.getLogger('arvados.arvados_fuse')
29
30
31 # Match any character which FUSE or Linux cannot accommodate as part
32 # of a filename. (If present in a collection filename, they will
33 # appear as underscores in the fuse mount.)
34 _disallowed_filename_characters = re.compile('[\x00/]')
35
36
37 class Directory(FreshBase):
38     """Generic directory object, backed by a dict.
39
40     Consists of a set of entries with the key representing the filename
41     and the value referencing a File or Directory object.
42     """
43
44     def __init__(self, parent_inode, inodes, apiconfig):
45         """parent_inode is the integer inode number"""
46
47         super(Directory, self).__init__()
48
49         self.inode = None
50         if not isinstance(parent_inode, int):
51             raise Exception("parent_inode should be an int")
52         self.parent_inode = parent_inode
53         self.inodes = inodes
54         self.apiconfig = apiconfig
55         self._entries = {}
56         self._mtime = time.time()
57
58     def forward_slash_subst(self):
59         if not hasattr(self, '_fsns'):
60             self._fsns = None
61             config = self.apiconfig()
62             try:
63                 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
64             except KeyError:
65                 # old API server with no FSNS config
66                 self._fsns = '_'
67             else:
68                 if self._fsns == '' or self._fsns == '/':
69                     self._fsns = None
70         return self._fsns
71
72     def unsanitize_filename(self, incoming):
73         """Replace ForwardSlashNameSubstitution value with /"""
74         fsns = self.forward_slash_subst()
75         if isinstance(fsns, str):
76             return incoming.replace(fsns, '/')
77         else:
78             return incoming
79
80     def sanitize_filename(self, dirty):
81         """Replace disallowed filename characters according to
82         ForwardSlashNameSubstitution in self.api_config."""
83         # '.' and '..' are not reachable if API server is newer than #6277
84         if dirty is None:
85             return None
86         elif dirty == '':
87             return '_'
88         elif dirty == '.':
89             return '_'
90         elif dirty == '..':
91             return '__'
92         else:
93             fsns = self.forward_slash_subst()
94             if isinstance(fsns, str):
95                 dirty = dirty.replace('/', fsns)
96             return _disallowed_filename_characters.sub('_', dirty)
97
98
99     #  Overridden by subclasses to implement logic to update the
100     #  entries dict when the directory is stale
101     @use_counter
102     def update(self):
103         pass
104
105     # Only used when computing the size of the disk footprint of the directory
106     # (stub)
107     def size(self):
108         return 0
109
110     def persisted(self):
111         return False
112
113     def checkupdate(self):
114         if self.stale():
115             try:
116                 self.update()
117             except apiclient.errors.HttpError as e:
118                 _logger.warn(e)
119
120     @use_counter
121     @check_update
122     def __getitem__(self, item):
123         return self._entries[item]
124
125     @use_counter
126     @check_update
127     def items(self):
128         return list(self._entries.items())
129
130     @use_counter
131     @check_update
132     def __contains__(self, k):
133         return k in self._entries
134
135     @use_counter
136     @check_update
137     def __len__(self):
138         return len(self._entries)
139
140     def fresh(self):
141         self.inodes.touch(self)
142         super(Directory, self).fresh()
143
144     def merge(self, items, fn, same, new_entry):
145         """Helper method for updating the contents of the directory.
146
147         Takes a list describing the new contents of the directory, reuse
148         entries that are the same in both the old and new lists, create new
149         entries, and delete old entries missing from the new list.
150
151         :items: iterable with new directory contents
152
153         :fn: function to take an entry in 'items' and return the desired file or
154         directory name, or None if this entry should be skipped
155
156         :same: function to compare an existing entry (a File or Directory
157         object) with an entry in the items list to determine whether to keep
158         the existing entry.
159
160         :new_entry: function to create a new directory entry (File or Directory
161         object) from an entry in the items list.
162
163         """
164
165         oldentries = self._entries
166         self._entries = {}
167         changed = False
168         for i in items:
169             name = self.sanitize_filename(fn(i))
170             if name:
171                 if name in oldentries and same(oldentries[name], i):
172                     # move existing directory entry over
173                     self._entries[name] = oldentries[name]
174                     del oldentries[name]
175                 else:
176                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
177                     # create new directory entry
178                     ent = new_entry(i)
179                     if ent is not None:
180                         self._entries[name] = self.inodes.add_entry(ent)
181                         changed = True
182
183         # delete any other directory entries that were not in found in 'items'
184         for i in oldentries:
185             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
186             self.inodes.invalidate_entry(self, i)
187             self.inodes.del_entry(oldentries[i])
188             changed = True
189
190         if changed:
191             self.inodes.invalidate_inode(self)
192             self._mtime = time.time()
193
194         self.fresh()
195
196     def in_use(self):
197         if super(Directory, self).in_use():
198             return True
199         for v in itervalues(self._entries):
200             if v.in_use():
201                 return True
202         return False
203
204     def has_ref(self, only_children):
205         if super(Directory, self).has_ref(only_children):
206             return True
207         for v in itervalues(self._entries):
208             if v.has_ref(False):
209                 return True
210         return False
211
212     def clear(self):
213         """Delete all entries"""
214         oldentries = self._entries
215         self._entries = {}
216         for n in oldentries:
217             oldentries[n].clear()
218             self.inodes.del_entry(oldentries[n])
219         self.invalidate()
220
221     def kernel_invalidate(self):
222         # Invalidating the dentry on the parent implies invalidating all paths
223         # below it as well.
224         parent = self.inodes[self.parent_inode]
225
226         # Find self on the parent in order to invalidate this path.
227         # Calling the public items() method might trigger a refresh,
228         # which we definitely don't want, so read the internal dict directly.
229         for k,v in viewitems(parent._entries):
230             if v is self:
231                 self.inodes.invalidate_entry(parent, k)
232                 break
233
234     def mtime(self):
235         return self._mtime
236
237     def writable(self):
238         return False
239
240     def flush(self):
241         pass
242
243     def want_event_subscribe(self):
244         raise NotImplementedError()
245
246     def create(self, name):
247         raise NotImplementedError()
248
249     def mkdir(self, name):
250         raise NotImplementedError()
251
252     def unlink(self, name):
253         raise NotImplementedError()
254
255     def rmdir(self, name):
256         raise NotImplementedError()
257
258     def rename(self, name_old, name_new, src):
259         raise NotImplementedError()
260
261
262 class CollectionDirectoryBase(Directory):
263     """Represent an Arvados Collection as a directory.
264
265     This class is used for Subcollections, and is also the base class for
266     CollectionDirectory, which implements collection loading/saving on
267     Collection records.
268
269     Most operations act only the underlying Arvados `Collection` object.  The
270     `Collection` object signals via a notify callback to
271     `CollectionDirectoryBase.on_event` that an item was added, removed or
272     modified.  FUSE inodes and directory entries are created, deleted or
273     invalidated in response to these events.
274
275     """
276
277     def __init__(self, parent_inode, inodes, apiconfig, collection):
278         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig)
279         self.apiconfig = apiconfig
280         self.collection = collection
281
282     def new_entry(self, name, item, mtime):
283         name = self.sanitize_filename(name)
284         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
285             if item.fuse_entry.dead is not True:
286                 raise Exception("Can only reparent dead inode entry")
287             if item.fuse_entry.inode is None:
288                 raise Exception("Reparented entry must still have valid inode")
289             item.fuse_entry.dead = False
290             self._entries[name] = item.fuse_entry
291         elif isinstance(item, arvados.collection.RichCollectionBase):
292             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, item))
293             self._entries[name].populate(mtime)
294         else:
295             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
296         item.fuse_entry = self._entries[name]
297
298     def on_event(self, event, collection, name, item):
299         if collection == self.collection:
300             name = self.sanitize_filename(name)
301             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
302             with llfuse.lock:
303                 if event == arvados.collection.ADD:
304                     self.new_entry(name, item, self.mtime())
305                 elif event == arvados.collection.DEL:
306                     ent = self._entries[name]
307                     del self._entries[name]
308                     self.inodes.invalidate_entry(self, name)
309                     self.inodes.del_entry(ent)
310                 elif event == arvados.collection.MOD:
311                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
312                         self.inodes.invalidate_inode(item.fuse_entry)
313                     elif name in self._entries:
314                         self.inodes.invalidate_inode(self._entries[name])
315
316     def populate(self, mtime):
317         self._mtime = mtime
318         self.collection.subscribe(self.on_event)
319         for entry, item in viewitems(self.collection):
320             self.new_entry(entry, item, self.mtime())
321
322     def writable(self):
323         return self.collection.writable()
324
325     @use_counter
326     def flush(self):
327         with llfuse.lock_released:
328             self.collection.root_collection().save()
329
330     @use_counter
331     @check_update
332     def create(self, name):
333         with llfuse.lock_released:
334             self.collection.open(name, "w").close()
335
336     @use_counter
337     @check_update
338     def mkdir(self, name):
339         with llfuse.lock_released:
340             self.collection.mkdirs(name)
341
342     @use_counter
343     @check_update
344     def unlink(self, name):
345         with llfuse.lock_released:
346             self.collection.remove(name)
347         self.flush()
348
349     @use_counter
350     @check_update
351     def rmdir(self, name):
352         with llfuse.lock_released:
353             self.collection.remove(name)
354         self.flush()
355
356     @use_counter
357     @check_update
358     def rename(self, name_old, name_new, src):
359         if not isinstance(src, CollectionDirectoryBase):
360             raise llfuse.FUSEError(errno.EPERM)
361
362         if name_new in self:
363             ent = src[name_old]
364             tgt = self[name_new]
365             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
366                 pass
367             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
368                 if len(tgt) > 0:
369                     raise llfuse.FUSEError(errno.ENOTEMPTY)
370             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
371                 raise llfuse.FUSEError(errno.ENOTDIR)
372             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
373                 raise llfuse.FUSEError(errno.EISDIR)
374
375         with llfuse.lock_released:
376             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
377         self.flush()
378         src.flush()
379
380     def clear(self):
381         super(CollectionDirectoryBase, self).clear()
382         self.collection = None
383
384
385 class CollectionDirectory(CollectionDirectoryBase):
386     """Represents the root of a directory tree representing a collection."""
387
388     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
389         super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, None)
390         self.api = api
391         self.num_retries = num_retries
392         self.collection_record_file = None
393         self.collection_record = None
394         self._poll = True
395         try:
396             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
397         except:
398             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
399             self._poll_time = 60*60
400
401         if isinstance(collection_record, dict):
402             self.collection_locator = collection_record['uuid']
403             self._mtime = convertTime(collection_record.get('modified_at'))
404         else:
405             self.collection_locator = collection_record
406             self._mtime = 0
407         self._manifest_size = 0
408         if self.collection_locator:
409             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
410         self._updating_lock = threading.Lock()
411
412     def same(self, i):
413         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
414
415     def writable(self):
416         return self.collection.writable() if self.collection is not None else self._writable
417
418     def want_event_subscribe(self):
419         return (uuid_pattern.match(self.collection_locator) is not None)
420
421     # Used by arv-web.py to switch the contents of the CollectionDirectory
422     def change_collection(self, new_locator):
423         """Switch the contents of the CollectionDirectory.
424
425         Must be called with llfuse.lock held.
426         """
427
428         self.collection_locator = new_locator
429         self.collection_record = None
430         self.update()
431
432     def new_collection(self, new_collection_record, coll_reader):
433         if self.inode:
434             self.clear()
435
436         self.collection_record = new_collection_record
437
438         if self.collection_record:
439             self._mtime = convertTime(self.collection_record.get('modified_at'))
440             self.collection_locator = self.collection_record["uuid"]
441             if self.collection_record_file is not None:
442                 self.collection_record_file.update(self.collection_record)
443
444         self.collection = coll_reader
445         self.populate(self.mtime())
446
447     def uuid(self):
448         return self.collection_locator
449
450     @use_counter
451     def update(self, to_record_version=None):
452         try:
453             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
454                 return True
455
456             if self.collection_locator is None:
457                 self.fresh()
458                 return True
459
460             try:
461                 with llfuse.lock_released:
462                     self._updating_lock.acquire()
463                     if not self.stale():
464                         return
465
466                     _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
467                     if self.collection is not None:
468                         if self.collection.known_past_version(to_record_version):
469                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
470                         else:
471                             self.collection.update()
472                     else:
473                         if uuid_pattern.match(self.collection_locator):
474                             coll_reader = arvados.collection.Collection(
475                                 self.collection_locator, self.api, self.api.keep,
476                                 num_retries=self.num_retries)
477                         else:
478                             coll_reader = arvados.collection.CollectionReader(
479                                 self.collection_locator, self.api, self.api.keep,
480                                 num_retries=self.num_retries)
481                         new_collection_record = coll_reader.api_response() or {}
482                         # If the Collection only exists in Keep, there will be no API
483                         # response.  Fill in the fields we need.
484                         if 'uuid' not in new_collection_record:
485                             new_collection_record['uuid'] = self.collection_locator
486                         if "portable_data_hash" not in new_collection_record:
487                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
488                         if 'manifest_text' not in new_collection_record:
489                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
490                         if 'storage_classes_desired' not in new_collection_record:
491                             new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
492
493                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
494                             self.new_collection(new_collection_record, coll_reader)
495
496                         self._manifest_size = len(coll_reader.manifest_text())
497                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
498                 # end with llfuse.lock_released, re-acquire lock
499
500                 self.fresh()
501                 return True
502             finally:
503                 self._updating_lock.release()
504         except arvados.errors.NotFoundError as e:
505             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
506         except arvados.errors.ArgumentError as detail:
507             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
508             if self.collection_record is not None and "manifest_text" in self.collection_record:
509                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
510         except Exception:
511             _logger.exception("arv-mount %s: error", self.collection_locator)
512             if self.collection_record is not None and "manifest_text" in self.collection_record:
513                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
514         self.invalidate()
515         return False
516
517     @use_counter
518     @check_update
519     def __getitem__(self, item):
520         if item == '.arvados#collection':
521             if self.collection_record_file is None:
522                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
523                 self.inodes.add_entry(self.collection_record_file)
524             return self.collection_record_file
525         else:
526             return super(CollectionDirectory, self).__getitem__(item)
527
528     def __contains__(self, k):
529         if k == '.arvados#collection':
530             return True
531         else:
532             return super(CollectionDirectory, self).__contains__(k)
533
534     def invalidate(self):
535         self.collection_record = None
536         self.collection_record_file = None
537         super(CollectionDirectory, self).invalidate()
538
539     def persisted(self):
540         return (self.collection_locator is not None)
541
542     def objsize(self):
543         # This is an empirically-derived heuristic to estimate the memory used
544         # to store this collection's metadata.  Calculating the memory
545         # footprint directly would be more accurate, but also more complicated.
546         return self._manifest_size * 128
547
548     def finalize(self):
549         if self.collection is not None:
550             if self.writable():
551                 self.collection.save()
552             self.collection.stop_threads()
553
554     def clear(self):
555         if self.collection is not None:
556             self.collection.stop_threads()
557         super(CollectionDirectory, self).clear()
558         self._manifest_size = 0
559
560
561 class TmpCollectionDirectory(CollectionDirectoryBase):
562     """A directory backed by an Arvados collection that never gets saved.
563
564     This supports using Keep as scratch space. A userspace program can
565     read the .arvados#collection file to get a current manifest in
566     order to save a snapshot of the scratch data or use it as a crunch
567     job output.
568     """
569
570     class UnsaveableCollection(arvados.collection.Collection):
571         def save(self):
572             pass
573         def save_new(self):
574             pass
575
576     def __init__(self, parent_inode, inodes, api_client, num_retries, storage_classes=None):
577         collection = self.UnsaveableCollection(
578             api_client=api_client,
579             keep_client=api_client.keep,
580             num_retries=num_retries,
581             storage_classes_desired=storage_classes)
582         super(TmpCollectionDirectory, self).__init__(
583             parent_inode, inodes, api_client.config, collection)
584         self.collection_record_file = None
585         self.populate(self.mtime())
586
587     def on_event(self, *args, **kwargs):
588         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
589         if self.collection_record_file:
590             with llfuse.lock:
591                 self.collection_record_file.invalidate()
592             self.inodes.invalidate_inode(self.collection_record_file)
593             _logger.debug("%s invalidated collection record", self)
594
595     def collection_record(self):
596         with llfuse.lock_released:
597             return {
598                 "uuid": None,
599                 "manifest_text": self.collection.manifest_text(),
600                 "portable_data_hash": self.collection.portable_data_hash(),
601                 "storage_classes_desired": self.collection.storage_classes_desired(),
602             }
603
604     def __contains__(self, k):
605         return (k == '.arvados#collection' or
606                 super(TmpCollectionDirectory, self).__contains__(k))
607
608     @use_counter
609     def __getitem__(self, item):
610         if item == '.arvados#collection':
611             if self.collection_record_file is None:
612                 self.collection_record_file = FuncToJSONFile(
613                     self.inode, self.collection_record)
614                 self.inodes.add_entry(self.collection_record_file)
615             return self.collection_record_file
616         return super(TmpCollectionDirectory, self).__getitem__(item)
617
618     def persisted(self):
619         return False
620
621     def writable(self):
622         return True
623
624     def want_event_subscribe(self):
625         return False
626
627     def finalize(self):
628         self.collection.stop_threads()
629
630     def invalidate(self):
631         if self.collection_record_file:
632             self.collection_record_file.invalidate()
633         super(TmpCollectionDirectory, self).invalidate()
634
635
636 class MagicDirectory(Directory):
637     """A special directory that logically contains the set of all extant keep locators.
638
639     When a file is referenced by lookup(), it is tested to see if it is a valid
640     keep locator to a manifest, and if so, loads the manifest contents as a
641     subdirectory of this directory with the locator as the directory name.
642     Since querying a list of all extant keep locators is impractical, only
643     collections that have already been accessed are visible to readdir().
644
645     """
646
647     README_TEXT = """
648 This directory provides access to Arvados collections as subdirectories listed
649 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
650 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
651 (in the form 'zzzzz-j7d0g-1234567890abcde').
652
653 Note that this directory will appear empty until you attempt to access a
654 specific collection or project subdirectory (such as trying to 'cd' into it),
655 at which point the collection or project will actually be looked up on the server
656 and the directory will appear if it exists.
657
658 """.lstrip()
659
660     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False, storage_classes=None):
661         super(MagicDirectory, self).__init__(parent_inode, inodes, api.config)
662         self.api = api
663         self.num_retries = num_retries
664         self.pdh_only = pdh_only
665         self.storage_classes = storage_classes
666
667     def __setattr__(self, name, value):
668         super(MagicDirectory, self).__setattr__(name, value)
669         # When we're assigned an inode, add a README.
670         if ((name == 'inode') and (self.inode is not None) and
671               (not self._entries)):
672             self._entries['README'] = self.inodes.add_entry(
673                 StringFile(self.inode, self.README_TEXT, time.time()))
674             # If we're the root directory, add an identical by_id subdirectory.
675             if self.inode == llfuse.ROOT_INODE:
676                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
677                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
678
679     def __contains__(self, k):
680         if k in self._entries:
681             return True
682
683         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
684             return False
685
686         try:
687             e = None
688
689             if group_uuid_pattern.match(k):
690                 project = self.api.groups().list(
691                     filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
692                 if project[u'items_available'] == 0:
693                     return False
694                 e = self.inodes.add_entry(ProjectDirectory(
695                     self.inode, self.inodes, self.api, self.num_retries,
696                     project[u'items'][0], storage_classes=self.storage_classes))
697             else:
698                 e = self.inodes.add_entry(CollectionDirectory(
699                         self.inode, self.inodes, self.api, self.num_retries, k))
700
701             if e.update():
702                 if k not in self._entries:
703                     self._entries[k] = e
704                 else:
705                     self.inodes.del_entry(e)
706                 return True
707             else:
708                 self.inodes.invalidate_entry(self, k)
709                 self.inodes.del_entry(e)
710                 return False
711         except Exception as ex:
712             _logger.exception("arv-mount lookup '%s':", k)
713             if e is not None:
714                 self.inodes.del_entry(e)
715             return False
716
717     def __getitem__(self, item):
718         if item in self:
719             return self._entries[item]
720         else:
721             raise KeyError("No collection with id " + item)
722
723     def clear(self):
724         pass
725
726     def want_event_subscribe(self):
727         return not self.pdh_only
728
729
730 class TagsDirectory(Directory):
731     """A special directory that contains as subdirectories all tags visible to the user."""
732
733     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
734         super(TagsDirectory, self).__init__(parent_inode, inodes, api.config)
735         self.api = api
736         self.num_retries = num_retries
737         self._poll = True
738         self._poll_time = poll_time
739         self._extra = set()
740
741     def want_event_subscribe(self):
742         return True
743
744     @use_counter
745     def update(self):
746         with llfuse.lock_released:
747             tags = self.api.links().list(
748                 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
749                 select=['name'], distinct=True, limit=1000
750                 ).execute(num_retries=self.num_retries)
751         if "items" in tags:
752             self.merge(tags['items']+[{"name": n} for n in self._extra],
753                        lambda i: i['name'],
754                        lambda a, i: a.tag == i['name'],
755                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
756
757     @use_counter
758     @check_update
759     def __getitem__(self, item):
760         if super(TagsDirectory, self).__contains__(item):
761             return super(TagsDirectory, self).__getitem__(item)
762         with llfuse.lock_released:
763             tags = self.api.links().list(
764                 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
765             ).execute(num_retries=self.num_retries)
766         if tags["items"]:
767             self._extra.add(item)
768             self.update()
769         return super(TagsDirectory, self).__getitem__(item)
770
771     @use_counter
772     @check_update
773     def __contains__(self, k):
774         if super(TagsDirectory, self).__contains__(k):
775             return True
776         try:
777             self[k]
778             return True
779         except KeyError:
780             pass
781         return False
782
783
784 class TagDirectory(Directory):
785     """A special directory that contains as subdirectories all collections visible
786     to the user that are tagged with a particular tag.
787     """
788
789     def __init__(self, parent_inode, inodes, api, num_retries, tag,
790                  poll=False, poll_time=60):
791         super(TagDirectory, self).__init__(parent_inode, inodes, api.config)
792         self.api = api
793         self.num_retries = num_retries
794         self.tag = tag
795         self._poll = poll
796         self._poll_time = poll_time
797
798     def want_event_subscribe(self):
799         return True
800
801     @use_counter
802     def update(self):
803         with llfuse.lock_released:
804             taggedcollections = self.api.links().list(
805                 filters=[['link_class', '=', 'tag'],
806                          ['name', '=', self.tag],
807                          ['head_uuid', 'is_a', 'arvados#collection']],
808                 select=['head_uuid']
809                 ).execute(num_retries=self.num_retries)
810         self.merge(taggedcollections['items'],
811                    lambda i: i['head_uuid'],
812                    lambda a, i: a.collection_locator == i['head_uuid'],
813                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
814
815
816 class ProjectDirectory(Directory):
817     """A special directory that contains the contents of a project."""
818
819     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
820                  poll=True, poll_time=3, storage_classes=None):
821         super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config)
822         self.api = api
823         self.num_retries = num_retries
824         self.project_object = project_object
825         self.project_object_file = None
826         self.project_uuid = project_object['uuid']
827         self._poll = poll
828         self._poll_time = poll_time
829         self._updating_lock = threading.Lock()
830         self._current_user = None
831         self._full_listing = False
832         self.storage_classes = storage_classes
833
834     def want_event_subscribe(self):
835         return True
836
837     def createDirectory(self, i):
838         if collection_uuid_pattern.match(i['uuid']):
839             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
840         elif group_uuid_pattern.match(i['uuid']):
841             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time, self.storage_classes)
842         elif link_uuid_pattern.match(i['uuid']):
843             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
844                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
845             else:
846                 return None
847         elif uuid_pattern.match(i['uuid']):
848             return ObjectFile(self.parent_inode, i)
849         else:
850             return None
851
852     def uuid(self):
853         return self.project_uuid
854
855     def items(self):
856         self._full_listing = True
857         return super(ProjectDirectory, self).items()
858
859     def namefn(self, i):
860         if 'name' in i:
861             if i['name'] is None or len(i['name']) == 0:
862                 return None
863             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
864                 # collection or subproject
865                 return i['name']
866             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
867                 # name link
868                 return i['name']
869             elif 'kind' in i and i['kind'].startswith('arvados#'):
870                 # something else
871                 return "{}.{}".format(i['name'], i['kind'][8:])
872         else:
873             return None
874
875
876     @use_counter
877     def update(self):
878         if self.project_object_file == None:
879             self.project_object_file = ObjectFile(self.inode, self.project_object)
880             self.inodes.add_entry(self.project_object_file)
881
882         if not self._full_listing:
883             return True
884
885         def samefn(a, i):
886             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
887                 return a.uuid() == i['uuid']
888             elif isinstance(a, ObjectFile):
889                 return a.uuid() == i['uuid'] and not a.stale()
890             return False
891
892         try:
893             with llfuse.lock_released:
894                 self._updating_lock.acquire()
895                 if not self.stale():
896                     return
897
898                 if group_uuid_pattern.match(self.project_uuid):
899                     self.project_object = self.api.groups().get(
900                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
901                 elif user_uuid_pattern.match(self.project_uuid):
902                     self.project_object = self.api.users().get(
903                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
904                 # do this in 2 steps until #17424 is fixed
905                 contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
906                                                         order_key="uuid",
907                                                         num_retries=self.num_retries,
908                                                         uuid=self.project_uuid,
909                                                         filters=[["uuid", "is_a", "arvados#group"],
910                                                                  ["groups.group_class", "in", ["project","filter"]]]))
911                 contents.extend(arvados.util.keyset_list_all(self.api.groups().contents,
912                                                              order_key="uuid",
913                                                              num_retries=self.num_retries,
914                                                              uuid=self.project_uuid,
915                                                              filters=[["uuid", "is_a", "arvados#collection"]]))
916
917             # end with llfuse.lock_released, re-acquire lock
918
919             self.merge(contents,
920                        self.namefn,
921                        samefn,
922                        self.createDirectory)
923             return True
924         finally:
925             self._updating_lock.release()
926
927     def _add_entry(self, i, name):
928         ent = self.createDirectory(i)
929         self._entries[name] = self.inodes.add_entry(ent)
930         return self._entries[name]
931
932     @use_counter
933     @check_update
934     def __getitem__(self, k):
935         if k == '.arvados#project':
936             return self.project_object_file
937         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
938             return super(ProjectDirectory, self).__getitem__(k)
939         with llfuse.lock_released:
940             k2 = self.unsanitize_filename(k)
941             if k2 == k:
942                 namefilter = ["name", "=", k]
943             else:
944                 namefilter = ["name", "in", [k, k2]]
945             contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
946                                                        ["group_class", "in", ["project","filter"]],
947                                                        namefilter],
948                                               limit=2).execute(num_retries=self.num_retries)["items"]
949             if not contents:
950                 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
951                                                                 namefilter],
952                                                        limit=2).execute(num_retries=self.num_retries)["items"]
953         if contents:
954             if len(contents) > 1 and contents[1]['name'] == k:
955                 # If "foo/bar" and "foo[SUBST]bar" both exist, use
956                 # "foo[SUBST]bar".
957                 contents = [contents[1]]
958             name = self.sanitize_filename(self.namefn(contents[0]))
959             if name != k:
960                 raise KeyError(k)
961             return self._add_entry(contents[0], name)
962
963         # Didn't find item
964         raise KeyError(k)
965
966     def __contains__(self, k):
967         if k == '.arvados#project':
968             return True
969         try:
970             self[k]
971             return True
972         except KeyError:
973             pass
974         return False
975
976     @use_counter
977     @check_update
978     def writable(self):
979         with llfuse.lock_released:
980             if not self._current_user:
981                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
982             return self._current_user["uuid"] in self.project_object.get("writable_by", [])
983
984     def persisted(self):
985         return True
986
987     @use_counter
988     @check_update
989     def mkdir(self, name):
990         try:
991             with llfuse.lock_released:
992                 c = {
993                     "owner_uuid": self.project_uuid,
994                     "name": name,
995                     "manifest_text": "" }
996                 if self.storage_classes is not None:
997                     c["storage_classes_desired"] = self.storage_classes
998                 try:
999                     self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1000                 except Exception as e:
1001                     raise
1002             self.invalidate()
1003         except apiclient_errors.Error as error:
1004             _logger.error(error)
1005             raise llfuse.FUSEError(errno.EEXIST)
1006
1007     @use_counter
1008     @check_update
1009     def rmdir(self, name):
1010         if name not in self:
1011             raise llfuse.FUSEError(errno.ENOENT)
1012         if not isinstance(self[name], CollectionDirectory):
1013             raise llfuse.FUSEError(errno.EPERM)
1014         if len(self[name]) > 0:
1015             raise llfuse.FUSEError(errno.ENOTEMPTY)
1016         with llfuse.lock_released:
1017             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1018         self.invalidate()
1019
1020     @use_counter
1021     @check_update
1022     def rename(self, name_old, name_new, src):
1023         if not isinstance(src, ProjectDirectory):
1024             raise llfuse.FUSEError(errno.EPERM)
1025
1026         ent = src[name_old]
1027
1028         if not isinstance(ent, CollectionDirectory):
1029             raise llfuse.FUSEError(errno.EPERM)
1030
1031         if name_new in self:
1032             # POSIX semantics for replacing one directory with another is
1033             # tricky (the target directory must be empty, the operation must be
1034             # atomic which isn't possible with the Arvados API as of this
1035             # writing) so don't support that.
1036             raise llfuse.FUSEError(errno.EPERM)
1037
1038         self.api.collections().update(uuid=ent.uuid(),
1039                                       body={"owner_uuid": self.uuid(),
1040                                             "name": name_new}).execute(num_retries=self.num_retries)
1041
1042         # Acually move the entry from source directory to this directory.
1043         del src._entries[name_old]
1044         self._entries[name_new] = ent
1045         self.inodes.invalidate_entry(src, name_old)
1046
1047     @use_counter
1048     def child_event(self, ev):
1049         properties = ev.get("properties") or {}
1050         old_attrs = properties.get("old_attributes") or {}
1051         new_attrs = properties.get("new_attributes") or {}
1052         old_attrs["uuid"] = ev["object_uuid"]
1053         new_attrs["uuid"] = ev["object_uuid"]
1054         old_name = self.sanitize_filename(self.namefn(old_attrs))
1055         new_name = self.sanitize_filename(self.namefn(new_attrs))
1056
1057         # create events will have a new name, but not an old name
1058         # delete events will have an old name, but not a new name
1059         # update events will have an old and new name, and they may be same or different
1060         # if they are the same, an unrelated field changed and there is nothing to do.
1061
1062         if old_attrs.get("owner_uuid") != self.project_uuid:
1063             # Was moved from somewhere else, so don't try to remove entry.
1064             old_name = None
1065         if ev.get("object_owner_uuid") != self.project_uuid:
1066             # Was moved to somewhere else, so don't try to add entry
1067             new_name = None
1068
1069         if old_attrs.get("is_trashed"):
1070             # Was previously deleted
1071             old_name = None
1072         if new_attrs.get("is_trashed"):
1073             # Has been deleted
1074             new_name = None
1075
1076         if new_name != old_name:
1077             ent = None
1078             if old_name in self._entries:
1079                 ent = self._entries[old_name]
1080                 del self._entries[old_name]
1081                 self.inodes.invalidate_entry(self, old_name)
1082
1083             if new_name:
1084                 if ent is not None:
1085                     self._entries[new_name] = ent
1086                 else:
1087                     self._add_entry(new_attrs, new_name)
1088             elif ent is not None:
1089                 self.inodes.del_entry(ent)
1090
1091
1092 class SharedDirectory(Directory):
1093     """A special directory that represents users or groups who have shared projects with me."""
1094
1095     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
1096                  poll=False, poll_time=60, storage_classes=None):
1097         super(SharedDirectory, self).__init__(parent_inode, inodes, api.config)
1098         self.api = api
1099         self.num_retries = num_retries
1100         self.current_user = api.users().current().execute(num_retries=num_retries)
1101         self._poll = True
1102         self._poll_time = poll_time
1103         self._updating_lock = threading.Lock()
1104         self.storage_classes = storage_classes
1105
1106     @use_counter
1107     def update(self):
1108         try:
1109             with llfuse.lock_released:
1110                 self._updating_lock.acquire()
1111                 if not self.stale():
1112                     return
1113
1114                 contents = {}
1115                 roots = []
1116                 root_owners = set()
1117                 objects = {}
1118
1119                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1120                 if 'httpMethod' in methods.get('shared', {}):
1121                     page = []
1122                     while True:
1123                         resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
1124                                                         order="uuid",
1125                                                         limit=10000,
1126                                                         count="none",
1127                                                         include="owner_uuid").execute()
1128                         if not resp["items"]:
1129                             break
1130                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1131                         for r in resp["items"]:
1132                             objects[r["uuid"]] = r
1133                             roots.append(r["uuid"])
1134                         for r in resp["included"]:
1135                             objects[r["uuid"]] = r
1136                             root_owners.add(r["uuid"])
1137                 else:
1138                     all_projects = list(arvados.util.keyset_list_all(
1139                         self.api.groups().list,
1140                         order_key="uuid",
1141                         num_retries=self.num_retries,
1142                         filters=[['group_class','in',['project','filter']]],
1143                         select=["uuid", "owner_uuid"]))
1144                     for ob in all_projects:
1145                         objects[ob['uuid']] = ob
1146
1147                     current_uuid = self.current_user['uuid']
1148                     for ob in all_projects:
1149                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1150                             roots.append(ob['uuid'])
1151                             root_owners.add(ob['owner_uuid'])
1152
1153                     lusers = arvados.util.keyset_list_all(
1154                         self.api.users().list,
1155                         order_key="uuid",
1156                         num_retries=self.num_retries,
1157                         filters=[['uuid','in', list(root_owners)]])
1158                     lgroups = arvados.util.keyset_list_all(
1159                         self.api.groups().list,
1160                         order_key="uuid",
1161                         num_retries=self.num_retries,
1162                         filters=[['uuid','in', list(root_owners)+roots]])
1163
1164                     for l in lusers:
1165                         objects[l["uuid"]] = l
1166                     for l in lgroups:
1167                         objects[l["uuid"]] = l
1168
1169                 for r in root_owners:
1170                     if r in objects:
1171                         obr = objects[r]
1172                         if obr.get("name"):
1173                             contents[obr["name"]] = obr
1174                         elif "first_name" in obr:
1175                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1176
1177                 for r in roots:
1178                     if r in objects:
1179                         obr = objects[r]
1180                         if obr['owner_uuid'] not in objects:
1181                             contents[obr["name"]] = obr
1182
1183             # end with llfuse.lock_released, re-acquire lock
1184
1185             self.merge(viewitems(contents),
1186                        lambda i: i[0],
1187                        lambda a, i: a.uuid() == i[1]['uuid'],
1188                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
1189         except Exception:
1190             _logger.exception("arv-mount shared dir error")
1191         finally:
1192             self._updating_lock.release()
1193
1194     def want_event_subscribe(self):
1195         return True