16039: Obey ForwardSlashNameSubstitution config in arv-mount.
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future.utils import viewitems
8 from future.utils import itervalues
9 from builtins import dict
10 import logging
11 import re
12 import time
13 import llfuse
14 import arvados
15 import apiclient
16 import functools
17 import threading
18 from apiclient import errors as apiclient_errors
19 import errno
20 import time
21
22 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
23 from .fresh import FreshBase, convertTime, use_counter, check_update
24
25 import arvados.collection
26 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
27
28 _logger = logging.getLogger('arvados.arvados_fuse')
29
30
31 # Match any character which FUSE or Linux cannot accommodate as part
32 # of a filename. (If present in a collection filename, they will
33 # appear as underscores in the fuse mount.)
34 _disallowed_filename_characters = re.compile('[\x00/]')
35
36
37 class Directory(FreshBase):
38     """Generic directory object, backed by a dict.
39
40     Consists of a set of entries with the key representing the filename
41     and the value referencing a File or Directory object.
42     """
43
44     def __init__(self, parent_inode, inodes, apiconfig):
45         """parent_inode is the integer inode number"""
46
47         super(Directory, self).__init__()
48
49         self.inode = None
50         if not isinstance(parent_inode, int):
51             raise Exception("parent_inode should be an int")
52         self.parent_inode = parent_inode
53         self.inodes = inodes
54         self.apiconfig = apiconfig
55         self._entries = {}
56         self._mtime = time.time()
57
58     def unsanitize_filename(self, incoming):
59         """Replace ForwardSlashNameSubstitution value with /"""
60         fsns = self.apiconfig()["Collections"]["ForwardSlashNameSubstitution"]
61         if isinstance(fsns, str) and fsns != '' and fsns != '/':
62             return incoming.replace(fsns, '/')
63         else:
64             return incoming
65
66     def sanitize_filename(self, dirty):
67         """Replace disallowed filename characters according to
68         ForwardSlashNameSubstitution in self.api_config."""
69         # '.' and '..' are not reachable if API server is newer than #6277
70         if dirty is None:
71             return None
72         elif dirty == '':
73             return '_'
74         elif dirty == '.':
75             return '_'
76         elif dirty == '..':
77             return '__'
78         else:
79             fsns = self.apiconfig()["Collections"]["ForwardSlashNameSubstitution"]
80             if isinstance(fsns, str) and fsns != '':
81                 dirty = dirty.replace('/', fsns)
82             return _disallowed_filename_characters.sub('_', dirty)
83
84
85     #  Overridden by subclasses to implement logic to update the
86     #  entries dict when the directory is stale
87     @use_counter
88     def update(self):
89         pass
90
91     # Only used when computing the size of the disk footprint of the directory
92     # (stub)
93     def size(self):
94         return 0
95
96     def persisted(self):
97         return False
98
99     def checkupdate(self):
100         if self.stale():
101             try:
102                 self.update()
103             except apiclient.errors.HttpError as e:
104                 _logger.warn(e)
105
106     @use_counter
107     @check_update
108     def __getitem__(self, item):
109         return self._entries[item]
110
111     @use_counter
112     @check_update
113     def items(self):
114         return list(self._entries.items())
115
116     @use_counter
117     @check_update
118     def __contains__(self, k):
119         return k in self._entries
120
121     @use_counter
122     @check_update
123     def __len__(self):
124         return len(self._entries)
125
126     def fresh(self):
127         self.inodes.touch(self)
128         super(Directory, self).fresh()
129
130     def merge(self, items, fn, same, new_entry):
131         """Helper method for updating the contents of the directory.
132
133         Takes a list describing the new contents of the directory, reuse
134         entries that are the same in both the old and new lists, create new
135         entries, and delete old entries missing from the new list.
136
137         :items: iterable with new directory contents
138
139         :fn: function to take an entry in 'items' and return the desired file or
140         directory name, or None if this entry should be skipped
141
142         :same: function to compare an existing entry (a File or Directory
143         object) with an entry in the items list to determine whether to keep
144         the existing entry.
145
146         :new_entry: function to create a new directory entry (File or Directory
147         object) from an entry in the items list.
148
149         """
150
151         oldentries = self._entries
152         self._entries = {}
153         changed = False
154         for i in items:
155             name = self.sanitize_filename(fn(i))
156             if name:
157                 if name in oldentries and same(oldentries[name], i):
158                     # move existing directory entry over
159                     self._entries[name] = oldentries[name]
160                     del oldentries[name]
161                 else:
162                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
163                     # create new directory entry
164                     ent = new_entry(i)
165                     if ent is not None:
166                         self._entries[name] = self.inodes.add_entry(ent)
167                         changed = True
168
169         # delete any other directory entries that were not in found in 'items'
170         for i in oldentries:
171             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
172             self.inodes.invalidate_entry(self, i)
173             self.inodes.del_entry(oldentries[i])
174             changed = True
175
176         if changed:
177             self.inodes.invalidate_inode(self)
178             self._mtime = time.time()
179
180         self.fresh()
181
182     def in_use(self):
183         if super(Directory, self).in_use():
184             return True
185         for v in itervalues(self._entries):
186             if v.in_use():
187                 return True
188         return False
189
190     def has_ref(self, only_children):
191         if super(Directory, self).has_ref(only_children):
192             return True
193         for v in itervalues(self._entries):
194             if v.has_ref(False):
195                 return True
196         return False
197
198     def clear(self):
199         """Delete all entries"""
200         oldentries = self._entries
201         self._entries = {}
202         for n in oldentries:
203             oldentries[n].clear()
204             self.inodes.del_entry(oldentries[n])
205         self.invalidate()
206
207     def kernel_invalidate(self):
208         # Invalidating the dentry on the parent implies invalidating all paths
209         # below it as well.
210         parent = self.inodes[self.parent_inode]
211
212         # Find self on the parent in order to invalidate this path.
213         # Calling the public items() method might trigger a refresh,
214         # which we definitely don't want, so read the internal dict directly.
215         for k,v in viewitems(parent._entries):
216             if v is self:
217                 self.inodes.invalidate_entry(parent, k)
218                 break
219
220     def mtime(self):
221         return self._mtime
222
223     def writable(self):
224         return False
225
226     def flush(self):
227         pass
228
229     def want_event_subscribe(self):
230         raise NotImplementedError()
231
232     def create(self, name):
233         raise NotImplementedError()
234
235     def mkdir(self, name):
236         raise NotImplementedError()
237
238     def unlink(self, name):
239         raise NotImplementedError()
240
241     def rmdir(self, name):
242         raise NotImplementedError()
243
244     def rename(self, name_old, name_new, src):
245         raise NotImplementedError()
246
247
248 class CollectionDirectoryBase(Directory):
249     """Represent an Arvados Collection as a directory.
250
251     This class is used for Subcollections, and is also the base class for
252     CollectionDirectory, which implements collection loading/saving on
253     Collection records.
254
255     Most operations act only the underlying Arvados `Collection` object.  The
256     `Collection` object signals via a notify callback to
257     `CollectionDirectoryBase.on_event` that an item was added, removed or
258     modified.  FUSE inodes and directory entries are created, deleted or
259     invalidated in response to these events.
260
261     """
262
263     def __init__(self, parent_inode, inodes, apiconfig, collection):
264         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig)
265         self.apiconfig = apiconfig
266         self.collection = collection
267
268     def new_entry(self, name, item, mtime):
269         name = self.sanitize_filename(name)
270         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
271             if item.fuse_entry.dead is not True:
272                 raise Exception("Can only reparent dead inode entry")
273             if item.fuse_entry.inode is None:
274                 raise Exception("Reparented entry must still have valid inode")
275             item.fuse_entry.dead = False
276             self._entries[name] = item.fuse_entry
277         elif isinstance(item, arvados.collection.RichCollectionBase):
278             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, item))
279             self._entries[name].populate(mtime)
280         else:
281             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
282         item.fuse_entry = self._entries[name]
283
284     def on_event(self, event, collection, name, item):
285         if collection == self.collection:
286             name = self.sanitize_filename(name)
287             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
288             with llfuse.lock:
289                 if event == arvados.collection.ADD:
290                     self.new_entry(name, item, self.mtime())
291                 elif event == arvados.collection.DEL:
292                     ent = self._entries[name]
293                     del self._entries[name]
294                     self.inodes.invalidate_entry(self, name)
295                     self.inodes.del_entry(ent)
296                 elif event == arvados.collection.MOD:
297                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
298                         self.inodes.invalidate_inode(item.fuse_entry)
299                     elif name in self._entries:
300                         self.inodes.invalidate_inode(self._entries[name])
301
302     def populate(self, mtime):
303         self._mtime = mtime
304         self.collection.subscribe(self.on_event)
305         for entry, item in viewitems(self.collection):
306             self.new_entry(entry, item, self.mtime())
307
308     def writable(self):
309         return self.collection.writable()
310
311     @use_counter
312     def flush(self):
313         with llfuse.lock_released:
314             self.collection.root_collection().save()
315
316     @use_counter
317     @check_update
318     def create(self, name):
319         with llfuse.lock_released:
320             self.collection.open(name, "w").close()
321
322     @use_counter
323     @check_update
324     def mkdir(self, name):
325         with llfuse.lock_released:
326             self.collection.mkdirs(name)
327
328     @use_counter
329     @check_update
330     def unlink(self, name):
331         with llfuse.lock_released:
332             self.collection.remove(name)
333         self.flush()
334
335     @use_counter
336     @check_update
337     def rmdir(self, name):
338         with llfuse.lock_released:
339             self.collection.remove(name)
340         self.flush()
341
342     @use_counter
343     @check_update
344     def rename(self, name_old, name_new, src):
345         if not isinstance(src, CollectionDirectoryBase):
346             raise llfuse.FUSEError(errno.EPERM)
347
348         if name_new in self:
349             ent = src[name_old]
350             tgt = self[name_new]
351             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
352                 pass
353             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
354                 if len(tgt) > 0:
355                     raise llfuse.FUSEError(errno.ENOTEMPTY)
356             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
357                 raise llfuse.FUSEError(errno.ENOTDIR)
358             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
359                 raise llfuse.FUSEError(errno.EISDIR)
360
361         with llfuse.lock_released:
362             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
363         self.flush()
364         src.flush()
365
366     def clear(self):
367         super(CollectionDirectoryBase, self).clear()
368         self.collection = None
369
370
371 class CollectionDirectory(CollectionDirectoryBase):
372     """Represents the root of a directory tree representing a collection."""
373
374     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
375         super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, None)
376         self.api = api
377         self.num_retries = num_retries
378         self.collection_record_file = None
379         self.collection_record = None
380         self._poll = True
381         try:
382             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
383         except:
384             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
385             self._poll_time = 60*60
386
387         if isinstance(collection_record, dict):
388             self.collection_locator = collection_record['uuid']
389             self._mtime = convertTime(collection_record.get('modified_at'))
390         else:
391             self.collection_locator = collection_record
392             self._mtime = 0
393         self._manifest_size = 0
394         if self.collection_locator:
395             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
396         self._updating_lock = threading.Lock()
397
398     def same(self, i):
399         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
400
401     def writable(self):
402         return self.collection.writable() if self.collection is not None else self._writable
403
404     def want_event_subscribe(self):
405         return (uuid_pattern.match(self.collection_locator) is not None)
406
407     # Used by arv-web.py to switch the contents of the CollectionDirectory
408     def change_collection(self, new_locator):
409         """Switch the contents of the CollectionDirectory.
410
411         Must be called with llfuse.lock held.
412         """
413
414         self.collection_locator = new_locator
415         self.collection_record = None
416         self.update()
417
418     def new_collection(self, new_collection_record, coll_reader):
419         if self.inode:
420             self.clear()
421
422         self.collection_record = new_collection_record
423
424         if self.collection_record:
425             self._mtime = convertTime(self.collection_record.get('modified_at'))
426             self.collection_locator = self.collection_record["uuid"]
427             if self.collection_record_file is not None:
428                 self.collection_record_file.update(self.collection_record)
429
430         self.collection = coll_reader
431         self.populate(self.mtime())
432
433     def uuid(self):
434         return self.collection_locator
435
436     @use_counter
437     def update(self, to_record_version=None):
438         try:
439             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
440                 return True
441
442             if self.collection_locator is None:
443                 self.fresh()
444                 return True
445
446             try:
447                 with llfuse.lock_released:
448                     self._updating_lock.acquire()
449                     if not self.stale():
450                         return
451
452                     _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
453                     if self.collection is not None:
454                         if self.collection.known_past_version(to_record_version):
455                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
456                         else:
457                             self.collection.update()
458                     else:
459                         if uuid_pattern.match(self.collection_locator):
460                             coll_reader = arvados.collection.Collection(
461                                 self.collection_locator, self.api, self.api.keep,
462                                 num_retries=self.num_retries)
463                         else:
464                             coll_reader = arvados.collection.CollectionReader(
465                                 self.collection_locator, self.api, self.api.keep,
466                                 num_retries=self.num_retries)
467                         new_collection_record = coll_reader.api_response() or {}
468                         # If the Collection only exists in Keep, there will be no API
469                         # response.  Fill in the fields we need.
470                         if 'uuid' not in new_collection_record:
471                             new_collection_record['uuid'] = self.collection_locator
472                         if "portable_data_hash" not in new_collection_record:
473                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
474                         if 'manifest_text' not in new_collection_record:
475                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
476
477                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
478                             self.new_collection(new_collection_record, coll_reader)
479
480                         self._manifest_size = len(coll_reader.manifest_text())
481                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
482                 # end with llfuse.lock_released, re-acquire lock
483
484                 self.fresh()
485                 return True
486             finally:
487                 self._updating_lock.release()
488         except arvados.errors.NotFoundError as e:
489             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
490         except arvados.errors.ArgumentError as detail:
491             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
492             if self.collection_record is not None and "manifest_text" in self.collection_record:
493                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
494         except Exception:
495             _logger.exception("arv-mount %s: error", self.collection_locator)
496             if self.collection_record is not None and "manifest_text" in self.collection_record:
497                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
498         self.invalidate()
499         return False
500
501     @use_counter
502     @check_update
503     def __getitem__(self, item):
504         if item == '.arvados#collection':
505             if self.collection_record_file is None:
506                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
507                 self.inodes.add_entry(self.collection_record_file)
508             return self.collection_record_file
509         else:
510             return super(CollectionDirectory, self).__getitem__(item)
511
512     def __contains__(self, k):
513         if k == '.arvados#collection':
514             return True
515         else:
516             return super(CollectionDirectory, self).__contains__(k)
517
518     def invalidate(self):
519         self.collection_record = None
520         self.collection_record_file = None
521         super(CollectionDirectory, self).invalidate()
522
523     def persisted(self):
524         return (self.collection_locator is not None)
525
526     def objsize(self):
527         # This is an empirically-derived heuristic to estimate the memory used
528         # to store this collection's metadata.  Calculating the memory
529         # footprint directly would be more accurate, but also more complicated.
530         return self._manifest_size * 128
531
532     def finalize(self):
533         if self.collection is not None:
534             if self.writable():
535                 self.collection.save()
536             self.collection.stop_threads()
537
538     def clear(self):
539         if self.collection is not None:
540             self.collection.stop_threads()
541         super(CollectionDirectory, self).clear()
542         self._manifest_size = 0
543
544
545 class TmpCollectionDirectory(CollectionDirectoryBase):
546     """A directory backed by an Arvados collection that never gets saved.
547
548     This supports using Keep as scratch space. A userspace program can
549     read the .arvados#collection file to get a current manifest in
550     order to save a snapshot of the scratch data or use it as a crunch
551     job output.
552     """
553
554     class UnsaveableCollection(arvados.collection.Collection):
555         def save(self):
556             pass
557         def save_new(self):
558             pass
559
560     def __init__(self, parent_inode, inodes, api_client, num_retries):
561         collection = self.UnsaveableCollection(
562             api_client=api_client,
563             keep_client=api_client.keep,
564             num_retries=num_retries)
565         super(TmpCollectionDirectory, self).__init__(
566             parent_inode, inodes, api_client.config, collection)
567         self.collection_record_file = None
568         self.populate(self.mtime())
569
570     def on_event(self, *args, **kwargs):
571         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
572         if self.collection_record_file:
573             with llfuse.lock:
574                 self.collection_record_file.invalidate()
575             self.inodes.invalidate_inode(self.collection_record_file)
576             _logger.debug("%s invalidated collection record", self)
577
578     def collection_record(self):
579         with llfuse.lock_released:
580             return {
581                 "uuid": None,
582                 "manifest_text": self.collection.manifest_text(),
583                 "portable_data_hash": self.collection.portable_data_hash(),
584             }
585
586     def __contains__(self, k):
587         return (k == '.arvados#collection' or
588                 super(TmpCollectionDirectory, self).__contains__(k))
589
590     @use_counter
591     def __getitem__(self, item):
592         if item == '.arvados#collection':
593             if self.collection_record_file is None:
594                 self.collection_record_file = FuncToJSONFile(
595                     self.inode, self.collection_record)
596                 self.inodes.add_entry(self.collection_record_file)
597             return self.collection_record_file
598         return super(TmpCollectionDirectory, self).__getitem__(item)
599
600     def persisted(self):
601         return False
602
603     def writable(self):
604         return True
605
606     def want_event_subscribe(self):
607         return False
608
609     def finalize(self):
610         self.collection.stop_threads()
611
612     def invalidate(self):
613         if self.collection_record_file:
614             self.collection_record_file.invalidate()
615         super(TmpCollectionDirectory, self).invalidate()
616
617
618 class MagicDirectory(Directory):
619     """A special directory that logically contains the set of all extant keep locators.
620
621     When a file is referenced by lookup(), it is tested to see if it is a valid
622     keep locator to a manifest, and if so, loads the manifest contents as a
623     subdirectory of this directory with the locator as the directory name.
624     Since querying a list of all extant keep locators is impractical, only
625     collections that have already been accessed are visible to readdir().
626
627     """
628
629     README_TEXT = """
630 This directory provides access to Arvados collections as subdirectories listed
631 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
632 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
633 (in the form 'zzzzz-j7d0g-1234567890abcde').
634
635 Note that this directory will appear empty until you attempt to access a
636 specific collection or project subdirectory (such as trying to 'cd' into it),
637 at which point the collection or project will actually be looked up on the server
638 and the directory will appear if it exists.
639
640 """.lstrip()
641
642     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
643         super(MagicDirectory, self).__init__(parent_inode, inodes, api.config)
644         self.api = api
645         self.num_retries = num_retries
646         self.pdh_only = pdh_only
647
648     def __setattr__(self, name, value):
649         super(MagicDirectory, self).__setattr__(name, value)
650         # When we're assigned an inode, add a README.
651         if ((name == 'inode') and (self.inode is not None) and
652               (not self._entries)):
653             self._entries['README'] = self.inodes.add_entry(
654                 StringFile(self.inode, self.README_TEXT, time.time()))
655             # If we're the root directory, add an identical by_id subdirectory.
656             if self.inode == llfuse.ROOT_INODE:
657                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
658                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
659
660     def __contains__(self, k):
661         if k in self._entries:
662             return True
663
664         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
665             return False
666
667         try:
668             e = None
669
670             if group_uuid_pattern.match(k):
671                 project = self.api.groups().list(
672                     filters=[['group_class', '=', 'project'], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
673                 if project[u'items_available'] == 0:
674                     return False
675                 e = self.inodes.add_entry(ProjectDirectory(
676                     self.inode, self.inodes, self.api, self.num_retries, project[u'items'][0]))
677             else:
678                 import sys
679                 e = self.inodes.add_entry(CollectionDirectory(
680                         self.inode, self.inodes, self.api, self.num_retries, k))
681
682             if e.update():
683                 if k not in self._entries:
684                     self._entries[k] = e
685                 else:
686                     self.inodes.del_entry(e)
687                 return True
688             else:
689                 self.inodes.invalidate_entry(self, k)
690                 self.inodes.del_entry(e)
691                 return False
692         except Exception as ex:
693             _logger.exception("arv-mount lookup '%s':", k)
694             if e is not None:
695                 self.inodes.del_entry(e)
696             return False
697
698     def __getitem__(self, item):
699         if item in self:
700             return self._entries[item]
701         else:
702             raise KeyError("No collection with id " + item)
703
704     def clear(self):
705         pass
706
707     def want_event_subscribe(self):
708         return not self.pdh_only
709
710
711 class TagsDirectory(Directory):
712     """A special directory that contains as subdirectories all tags visible to the user."""
713
714     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
715         super(TagsDirectory, self).__init__(parent_inode, inodes, api.config)
716         self.api = api
717         self.num_retries = num_retries
718         self._poll = True
719         self._poll_time = poll_time
720         self._extra = set()
721
722     def want_event_subscribe(self):
723         return True
724
725     @use_counter
726     def update(self):
727         with llfuse.lock_released:
728             tags = self.api.links().list(
729                 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
730                 select=['name'], distinct=True, limit=1000
731                 ).execute(num_retries=self.num_retries)
732         if "items" in tags:
733             self.merge(tags['items']+[{"name": n} for n in self._extra],
734                        lambda i: i['name'],
735                        lambda a, i: a.tag == i['name'],
736                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
737
738     @use_counter
739     @check_update
740     def __getitem__(self, item):
741         if super(TagsDirectory, self).__contains__(item):
742             return super(TagsDirectory, self).__getitem__(item)
743         with llfuse.lock_released:
744             tags = self.api.links().list(
745                 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
746             ).execute(num_retries=self.num_retries)
747         if tags["items"]:
748             self._extra.add(item)
749             self.update()
750         return super(TagsDirectory, self).__getitem__(item)
751
752     @use_counter
753     @check_update
754     def __contains__(self, k):
755         if super(TagsDirectory, self).__contains__(k):
756             return True
757         try:
758             self[k]
759             return True
760         except KeyError:
761             pass
762         return False
763
764
765 class TagDirectory(Directory):
766     """A special directory that contains as subdirectories all collections visible
767     to the user that are tagged with a particular tag.
768     """
769
770     def __init__(self, parent_inode, inodes, api, num_retries, tag,
771                  poll=False, poll_time=60):
772         super(TagDirectory, self).__init__(parent_inode, inodes, api.config)
773         self.api = api
774         self.num_retries = num_retries
775         self.tag = tag
776         self._poll = poll
777         self._poll_time = poll_time
778
779     def want_event_subscribe(self):
780         return True
781
782     @use_counter
783     def update(self):
784         with llfuse.lock_released:
785             taggedcollections = self.api.links().list(
786                 filters=[['link_class', '=', 'tag'],
787                          ['name', '=', self.tag],
788                          ['head_uuid', 'is_a', 'arvados#collection']],
789                 select=['head_uuid']
790                 ).execute(num_retries=self.num_retries)
791         self.merge(taggedcollections['items'],
792                    lambda i: i['head_uuid'],
793                    lambda a, i: a.collection_locator == i['head_uuid'],
794                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
795
796
797 class ProjectDirectory(Directory):
798     """A special directory that contains the contents of a project."""
799
800     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
801                  poll=False, poll_time=60):
802         super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config)
803         self.api = api
804         self.num_retries = num_retries
805         self.project_object = project_object
806         self.project_object_file = None
807         self.project_uuid = project_object['uuid']
808         self._poll = poll
809         self._poll_time = poll_time
810         self._updating_lock = threading.Lock()
811         self._current_user = None
812         self._full_listing = False
813
814     def want_event_subscribe(self):
815         return True
816
817     def createDirectory(self, i):
818         if collection_uuid_pattern.match(i['uuid']):
819             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
820         elif group_uuid_pattern.match(i['uuid']):
821             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
822         elif link_uuid_pattern.match(i['uuid']):
823             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
824                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
825             else:
826                 return None
827         elif uuid_pattern.match(i['uuid']):
828             return ObjectFile(self.parent_inode, i)
829         else:
830             return None
831
832     def uuid(self):
833         return self.project_uuid
834
835     def items(self):
836         self._full_listing = True
837         return super(ProjectDirectory, self).items()
838
839     def namefn(self, i):
840         if 'name' in i:
841             if i['name'] is None or len(i['name']) == 0:
842                 return None
843             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
844                 # collection or subproject
845                 return i['name']
846             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
847                 # name link
848                 return i['name']
849             elif 'kind' in i and i['kind'].startswith('arvados#'):
850                 # something else
851                 return "{}.{}".format(i['name'], i['kind'][8:])
852         else:
853             return None
854
855
856     @use_counter
857     def update(self):
858         if self.project_object_file == None:
859             self.project_object_file = ObjectFile(self.inode, self.project_object)
860             self.inodes.add_entry(self.project_object_file)
861
862         if not self._full_listing:
863             return True
864
865         def samefn(a, i):
866             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
867                 return a.uuid() == i['uuid']
868             elif isinstance(a, ObjectFile):
869                 return a.uuid() == i['uuid'] and not a.stale()
870             return False
871
872         try:
873             with llfuse.lock_released:
874                 self._updating_lock.acquire()
875                 if not self.stale():
876                     return
877
878                 if group_uuid_pattern.match(self.project_uuid):
879                     self.project_object = self.api.groups().get(
880                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
881                 elif user_uuid_pattern.match(self.project_uuid):
882                     self.project_object = self.api.users().get(
883                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
884
885                 contents = arvados.util.list_all(self.api.groups().list,
886                                                  self.num_retries,
887                                                  filters=[["owner_uuid", "=", self.project_uuid],
888                                                           ["group_class", "=", "project"]])
889                 contents.extend(arvados.util.list_all(self.api.collections().list,
890                                                       self.num_retries,
891                                                       filters=[["owner_uuid", "=", self.project_uuid]]))
892
893             # end with llfuse.lock_released, re-acquire lock
894
895             self.merge(contents,
896                        self.namefn,
897                        samefn,
898                        self.createDirectory)
899             return True
900         finally:
901             self._updating_lock.release()
902
903     def _add_entry(self, i, name):
904         ent = self.createDirectory(i)
905         self._entries[name] = self.inodes.add_entry(ent)
906         return self._entries[name]
907
908     @use_counter
909     @check_update
910     def __getitem__(self, k):
911         if k == '.arvados#project':
912             return self.project_object_file
913         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
914             return super(ProjectDirectory, self).__getitem__(k)
915         with llfuse.lock_released:
916             k2 = self.unsanitize_filename(k)
917             if k2 == k:
918                 namefilter = ["name", "=", k]
919             else:
920                 namefilter = ["name", "in", [k, k2]]
921             contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
922                                                        ["group_class", "=", "project"],
923                                                        namefilter],
924                                               limit=2).execute(num_retries=self.num_retries)["items"]
925             if not contents:
926                 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
927                                                                 namefilter],
928                                                        limit=2).execute(num_retries=self.num_retries)["items"]
929         if contents:
930             if len(contents) > 1 and contents[1].name == k:
931                 # If "foo/bar" and "foo[SUBST]bar" both exist, use
932                 # "foo[SUBST]bar".
933                 contents = [contents[1]]
934             name = self.sanitize_filename(self.namefn(contents[0]))
935             if name != k:
936                 raise KeyError(k)
937             return self._add_entry(contents[0], name)
938
939         # Didn't find item
940         raise KeyError(k)
941
942     def __contains__(self, k):
943         if k == '.arvados#project':
944             return True
945         try:
946             self[k]
947             return True
948         except KeyError:
949             pass
950         return False
951
952     @use_counter
953     @check_update
954     def writable(self):
955         with llfuse.lock_released:
956             if not self._current_user:
957                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
958             return self._current_user["uuid"] in self.project_object.get("writable_by", [])
959
960     def persisted(self):
961         return True
962
963     @use_counter
964     @check_update
965     def mkdir(self, name):
966         try:
967             with llfuse.lock_released:
968                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
969                                                     "name": name,
970                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
971             self.invalidate()
972         except apiclient_errors.Error as error:
973             _logger.error(error)
974             raise llfuse.FUSEError(errno.EEXIST)
975
976     @use_counter
977     @check_update
978     def rmdir(self, name):
979         if name not in self:
980             raise llfuse.FUSEError(errno.ENOENT)
981         if not isinstance(self[name], CollectionDirectory):
982             raise llfuse.FUSEError(errno.EPERM)
983         if len(self[name]) > 0:
984             raise llfuse.FUSEError(errno.ENOTEMPTY)
985         with llfuse.lock_released:
986             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
987         self.invalidate()
988
989     @use_counter
990     @check_update
991     def rename(self, name_old, name_new, src):
992         if not isinstance(src, ProjectDirectory):
993             raise llfuse.FUSEError(errno.EPERM)
994
995         ent = src[name_old]
996
997         if not isinstance(ent, CollectionDirectory):
998             raise llfuse.FUSEError(errno.EPERM)
999
1000         if name_new in self:
1001             # POSIX semantics for replacing one directory with another is
1002             # tricky (the target directory must be empty, the operation must be
1003             # atomic which isn't possible with the Arvados API as of this
1004             # writing) so don't support that.
1005             raise llfuse.FUSEError(errno.EPERM)
1006
1007         self.api.collections().update(uuid=ent.uuid(),
1008                                       body={"owner_uuid": self.uuid(),
1009                                             "name": name_new}).execute(num_retries=self.num_retries)
1010
1011         # Acually move the entry from source directory to this directory.
1012         del src._entries[name_old]
1013         self._entries[name_new] = ent
1014         self.inodes.invalidate_entry(src, name_old)
1015
1016     @use_counter
1017     def child_event(self, ev):
1018         properties = ev.get("properties") or {}
1019         old_attrs = properties.get("old_attributes") or {}
1020         new_attrs = properties.get("new_attributes") or {}
1021         old_attrs["uuid"] = ev["object_uuid"]
1022         new_attrs["uuid"] = ev["object_uuid"]
1023         old_name = self.sanitize_filename(self.namefn(old_attrs))
1024         new_name = self.sanitize_filename(self.namefn(new_attrs))
1025
1026         # create events will have a new name, but not an old name
1027         # delete events will have an old name, but not a new name
1028         # update events will have an old and new name, and they may be same or different
1029         # if they are the same, an unrelated field changed and there is nothing to do.
1030
1031         if old_attrs.get("owner_uuid") != self.project_uuid:
1032             # Was moved from somewhere else, so don't try to remove entry.
1033             old_name = None
1034         if ev.get("object_owner_uuid") != self.project_uuid:
1035             # Was moved to somewhere else, so don't try to add entry
1036             new_name = None
1037
1038         if old_attrs.get("is_trashed"):
1039             # Was previously deleted
1040             old_name = None
1041         if new_attrs.get("is_trashed"):
1042             # Has been deleted
1043             new_name = None
1044
1045         if new_name != old_name:
1046             ent = None
1047             if old_name in self._entries:
1048                 ent = self._entries[old_name]
1049                 del self._entries[old_name]
1050                 self.inodes.invalidate_entry(self, old_name)
1051
1052             if new_name:
1053                 if ent is not None:
1054                     self._entries[new_name] = ent
1055                 else:
1056                     self._add_entry(new_attrs, new_name)
1057             elif ent is not None:
1058                 self.inodes.del_entry(ent)
1059
1060
1061 class SharedDirectory(Directory):
1062     """A special directory that represents users or groups who have shared projects with me."""
1063
1064     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
1065                  poll=False, poll_time=60):
1066         super(SharedDirectory, self).__init__(parent_inode, inodes, api.config)
1067         self.api = api
1068         self.num_retries = num_retries
1069         self.current_user = api.users().current().execute(num_retries=num_retries)
1070         self._poll = True
1071         self._poll_time = poll_time
1072         self._updating_lock = threading.Lock()
1073
1074     @use_counter
1075     def update(self):
1076         try:
1077             with llfuse.lock_released:
1078                 self._updating_lock.acquire()
1079                 if not self.stale():
1080                     return
1081
1082                 contents = {}
1083                 roots = []
1084                 root_owners = set()
1085                 objects = {}
1086
1087                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1088                 if 'httpMethod' in methods.get('shared', {}):
1089                     page = []
1090                     while True:
1091                         resp = self.api.groups().shared(filters=[['group_class', '=', 'project']]+page,
1092                                                         order="uuid",
1093                                                         limit=10000,
1094                                                         count="none",
1095                                                         include="owner_uuid").execute()
1096                         if not resp["items"]:
1097                             break
1098                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1099                         for r in resp["items"]:
1100                             objects[r["uuid"]] = r
1101                             roots.append(r["uuid"])
1102                         for r in resp["included"]:
1103                             objects[r["uuid"]] = r
1104                             root_owners.add(r["uuid"])
1105                 else:
1106                     all_projects = arvados.util.list_all(
1107                         self.api.groups().list, self.num_retries,
1108                         filters=[['group_class','=','project']],
1109                         select=["uuid", "owner_uuid"])
1110                     for ob in all_projects:
1111                         objects[ob['uuid']] = ob
1112
1113                     current_uuid = self.current_user['uuid']
1114                     for ob in all_projects:
1115                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1116                             roots.append(ob['uuid'])
1117                             root_owners.add(ob['owner_uuid'])
1118
1119                     lusers = arvados.util.list_all(
1120                         self.api.users().list, self.num_retries,
1121                         filters=[['uuid','in', list(root_owners)]])
1122                     lgroups = arvados.util.list_all(
1123                         self.api.groups().list, self.num_retries,
1124                         filters=[['uuid','in', list(root_owners)+roots]])
1125
1126                     for l in lusers:
1127                         objects[l["uuid"]] = l
1128                     for l in lgroups:
1129                         objects[l["uuid"]] = l
1130
1131                 for r in root_owners:
1132                     if r in objects:
1133                         obr = objects[r]
1134                         if obr.get("name"):
1135                             contents[obr["name"]] = obr
1136                         #elif obr.get("username"):
1137                         #    contents[obr["username"]] = obr
1138                         elif "first_name" in obr:
1139                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1140
1141                 for r in roots:
1142                     if r in objects:
1143                         obr = objects[r]
1144                         if obr['owner_uuid'] not in objects:
1145                             contents[obr["name"]] = obr
1146
1147             # end with llfuse.lock_released, re-acquire lock
1148
1149             self.merge(viewitems(contents),
1150                        lambda i: i[0],
1151                        lambda a, i: a.uuid() == i[1]['uuid'],
1152                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
1153         except Exception:
1154             _logger.exception("arv-mount shared dir error")
1155         finally:
1156             self._updating_lock.release()
1157
1158     def want_event_subscribe(self):
1159         return True