13146: FUSE uses the "groups.shared" API for shared/ directory.
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import logging
6 import re
7 import time
8 import llfuse
9 import arvados
10 import apiclient
11 import functools
12 import threading
13 from apiclient import errors as apiclient_errors
14 import errno
15 import time
16
17 from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from fresh import FreshBase, convertTime, use_counter, check_update
19
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
22
23 _logger = logging.getLogger('arvados.arvados_fuse')
24
25
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile('[\x00/]')
30
31 # '.' and '..' are not reachable if API server is newer than #6277
32 def sanitize_filename(dirty):
33     """Replace disallowed filename characters with harmless "_"."""
34     if dirty is None:
35         return None
36     elif dirty == '':
37         return '_'
38     elif dirty == '.':
39         return '_'
40     elif dirty == '..':
41         return '__'
42     else:
43         return _disallowed_filename_characters.sub('_', dirty)
44
45
46 class Directory(FreshBase):
47     """Generic directory object, backed by a dict.
48
49     Consists of a set of entries with the key representing the filename
50     and the value referencing a File or Directory object.
51     """
52
53     def __init__(self, parent_inode, inodes):
54         """parent_inode is the integer inode number"""
55
56         super(Directory, self).__init__()
57
58         self.inode = None
59         if not isinstance(parent_inode, int):
60             raise Exception("parent_inode should be an int")
61         self.parent_inode = parent_inode
62         self.inodes = inodes
63         self._entries = {}
64         self._mtime = time.time()
65
66     #  Overriden by subclasses to implement logic to update the entries dict
67     #  when the directory is stale
68     @use_counter
69     def update(self):
70         pass
71
72     # Only used when computing the size of the disk footprint of the directory
73     # (stub)
74     def size(self):
75         return 0
76
77     def persisted(self):
78         return False
79
80     def checkupdate(self):
81         if self.stale():
82             try:
83                 self.update()
84             except apiclient.errors.HttpError as e:
85                 _logger.warn(e)
86
87     @use_counter
88     @check_update
89     def __getitem__(self, item):
90         return self._entries[item]
91
92     @use_counter
93     @check_update
94     def items(self):
95         return list(self._entries.items())
96
97     @use_counter
98     @check_update
99     def __contains__(self, k):
100         return k in self._entries
101
102     @use_counter
103     @check_update
104     def __len__(self):
105         return len(self._entries)
106
107     def fresh(self):
108         self.inodes.touch(self)
109         super(Directory, self).fresh()
110
111     def merge(self, items, fn, same, new_entry):
112         """Helper method for updating the contents of the directory.
113
114         Takes a list describing the new contents of the directory, reuse
115         entries that are the same in both the old and new lists, create new
116         entries, and delete old entries missing from the new list.
117
118         :items: iterable with new directory contents
119
120         :fn: function to take an entry in 'items' and return the desired file or
121         directory name, or None if this entry should be skipped
122
123         :same: function to compare an existing entry (a File or Directory
124         object) with an entry in the items list to determine whether to keep
125         the existing entry.
126
127         :new_entry: function to create a new directory entry (File or Directory
128         object) from an entry in the items list.
129
130         """
131
132         oldentries = self._entries
133         self._entries = {}
134         changed = False
135         for i in items:
136             name = sanitize_filename(fn(i))
137             if name:
138                 if name in oldentries and same(oldentries[name], i):
139                     # move existing directory entry over
140                     self._entries[name] = oldentries[name]
141                     del oldentries[name]
142                 else:
143                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
144                     # create new directory entry
145                     ent = new_entry(i)
146                     if ent is not None:
147                         self._entries[name] = self.inodes.add_entry(ent)
148                         changed = True
149
150         # delete any other directory entries that were not in found in 'items'
151         for i in oldentries:
152             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
153             self.inodes.invalidate_entry(self, i)
154             self.inodes.del_entry(oldentries[i])
155             changed = True
156
157         if changed:
158             self.inodes.invalidate_inode(self)
159             self._mtime = time.time()
160
161         self.fresh()
162
163     def in_use(self):
164         if super(Directory, self).in_use():
165             return True
166         for v in self._entries.itervalues():
167             if v.in_use():
168                 return True
169         return False
170
171     def has_ref(self, only_children):
172         if super(Directory, self).has_ref(only_children):
173             return True
174         for v in self._entries.itervalues():
175             if v.has_ref(False):
176                 return True
177         return False
178
179     def clear(self):
180         """Delete all entries"""
181         oldentries = self._entries
182         self._entries = {}
183         for n in oldentries:
184             oldentries[n].clear()
185             self.inodes.del_entry(oldentries[n])
186         self.invalidate()
187
188     def kernel_invalidate(self):
189         # Invalidating the dentry on the parent implies invalidating all paths
190         # below it as well.
191         parent = self.inodes[self.parent_inode]
192
193         # Find self on the parent in order to invalidate this path.
194         # Calling the public items() method might trigger a refresh,
195         # which we definitely don't want, so read the internal dict directly.
196         for k,v in parent._entries.items():
197             if v is self:
198                 self.inodes.invalidate_entry(parent, k)
199                 break
200
201     def mtime(self):
202         return self._mtime
203
204     def writable(self):
205         return False
206
207     def flush(self):
208         pass
209
210     def want_event_subscribe(self):
211         raise NotImplementedError()
212
213     def create(self, name):
214         raise NotImplementedError()
215
216     def mkdir(self, name):
217         raise NotImplementedError()
218
219     def unlink(self, name):
220         raise NotImplementedError()
221
222     def rmdir(self, name):
223         raise NotImplementedError()
224
225     def rename(self, name_old, name_new, src):
226         raise NotImplementedError()
227
228
229 class CollectionDirectoryBase(Directory):
230     """Represent an Arvados Collection as a directory.
231
232     This class is used for Subcollections, and is also the base class for
233     CollectionDirectory, which implements collection loading/saving on
234     Collection records.
235
236     Most operations act only the underlying Arvados `Collection` object.  The
237     `Collection` object signals via a notify callback to
238     `CollectionDirectoryBase.on_event` that an item was added, removed or
239     modified.  FUSE inodes and directory entries are created, deleted or
240     invalidated in response to these events.
241
242     """
243
244     def __init__(self, parent_inode, inodes, collection):
245         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
246         self.collection = collection
247
248     def new_entry(self, name, item, mtime):
249         name = sanitize_filename(name)
250         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
251             if item.fuse_entry.dead is not True:
252                 raise Exception("Can only reparent dead inode entry")
253             if item.fuse_entry.inode is None:
254                 raise Exception("Reparented entry must still have valid inode")
255             item.fuse_entry.dead = False
256             self._entries[name] = item.fuse_entry
257         elif isinstance(item, arvados.collection.RichCollectionBase):
258             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
259             self._entries[name].populate(mtime)
260         else:
261             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
262         item.fuse_entry = self._entries[name]
263
264     def on_event(self, event, collection, name, item):
265         if collection == self.collection:
266             name = sanitize_filename(name)
267             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
268             with llfuse.lock:
269                 if event == arvados.collection.ADD:
270                     self.new_entry(name, item, self.mtime())
271                 elif event == arvados.collection.DEL:
272                     ent = self._entries[name]
273                     del self._entries[name]
274                     self.inodes.invalidate_entry(self, name)
275                     self.inodes.del_entry(ent)
276                 elif event == arvados.collection.MOD:
277                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
278                         self.inodes.invalidate_inode(item.fuse_entry)
279                     elif name in self._entries:
280                         self.inodes.invalidate_inode(self._entries[name])
281
282     def populate(self, mtime):
283         self._mtime = mtime
284         self.collection.subscribe(self.on_event)
285         for entry, item in self.collection.items():
286             self.new_entry(entry, item, self.mtime())
287
288     def writable(self):
289         return self.collection.writable()
290
291     @use_counter
292     def flush(self):
293         with llfuse.lock_released:
294             self.collection.root_collection().save()
295
296     @use_counter
297     @check_update
298     def create(self, name):
299         with llfuse.lock_released:
300             self.collection.open(name, "w").close()
301
302     @use_counter
303     @check_update
304     def mkdir(self, name):
305         with llfuse.lock_released:
306             self.collection.mkdirs(name)
307
308     @use_counter
309     @check_update
310     def unlink(self, name):
311         with llfuse.lock_released:
312             self.collection.remove(name)
313         self.flush()
314
315     @use_counter
316     @check_update
317     def rmdir(self, name):
318         with llfuse.lock_released:
319             self.collection.remove(name)
320         self.flush()
321
322     @use_counter
323     @check_update
324     def rename(self, name_old, name_new, src):
325         if not isinstance(src, CollectionDirectoryBase):
326             raise llfuse.FUSEError(errno.EPERM)
327
328         if name_new in self:
329             ent = src[name_old]
330             tgt = self[name_new]
331             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
332                 pass
333             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
334                 if len(tgt) > 0:
335                     raise llfuse.FUSEError(errno.ENOTEMPTY)
336             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
337                 raise llfuse.FUSEError(errno.ENOTDIR)
338             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
339                 raise llfuse.FUSEError(errno.EISDIR)
340
341         with llfuse.lock_released:
342             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
343         self.flush()
344         src.flush()
345
346     def clear(self):
347         super(CollectionDirectoryBase, self).clear()
348         self.collection = None
349
350
351 class CollectionDirectory(CollectionDirectoryBase):
352     """Represents the root of a directory tree representing a collection."""
353
354     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
355         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
356         self.api = api
357         self.num_retries = num_retries
358         self.collection_record_file = None
359         self.collection_record = None
360         self._poll = True
361         try:
362             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2)/2)
363         except:
364             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
365             self._poll_time = 60*60
366
367         if isinstance(collection_record, dict):
368             self.collection_locator = collection_record['uuid']
369             self._mtime = convertTime(collection_record.get('modified_at'))
370         else:
371             self.collection_locator = collection_record
372             self._mtime = 0
373         self._manifest_size = 0
374         if self.collection_locator:
375             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
376         self._updating_lock = threading.Lock()
377
378     def same(self, i):
379         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
380
381     def writable(self):
382         return self.collection.writable() if self.collection is not None else self._writable
383
384     def want_event_subscribe(self):
385         return (uuid_pattern.match(self.collection_locator) is not None)
386
387     # Used by arv-web.py to switch the contents of the CollectionDirectory
388     def change_collection(self, new_locator):
389         """Switch the contents of the CollectionDirectory.
390
391         Must be called with llfuse.lock held.
392         """
393
394         self.collection_locator = new_locator
395         self.collection_record = None
396         self.update()
397
398     def new_collection(self, new_collection_record, coll_reader):
399         if self.inode:
400             self.clear()
401
402         self.collection_record = new_collection_record
403
404         if self.collection_record:
405             self._mtime = convertTime(self.collection_record.get('modified_at'))
406             self.collection_locator = self.collection_record["uuid"]
407             if self.collection_record_file is not None:
408                 self.collection_record_file.update(self.collection_record)
409
410         self.collection = coll_reader
411         self.populate(self.mtime())
412
413     def uuid(self):
414         return self.collection_locator
415
416     @use_counter
417     def update(self, to_record_version=None):
418         try:
419             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
420                 return True
421
422             if self.collection_locator is None:
423                 self.fresh()
424                 return True
425
426             try:
427                 with llfuse.lock_released:
428                     self._updating_lock.acquire()
429                     if not self.stale():
430                         return
431
432                     _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
433                     if self.collection is not None:
434                         if self.collection.known_past_version(to_record_version):
435                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
436                         else:
437                             self.collection.update()
438                     else:
439                         if uuid_pattern.match(self.collection_locator):
440                             coll_reader = arvados.collection.Collection(
441                                 self.collection_locator, self.api, self.api.keep,
442                                 num_retries=self.num_retries)
443                         else:
444                             coll_reader = arvados.collection.CollectionReader(
445                                 self.collection_locator, self.api, self.api.keep,
446                                 num_retries=self.num_retries)
447                         new_collection_record = coll_reader.api_response() or {}
448                         # If the Collection only exists in Keep, there will be no API
449                         # response.  Fill in the fields we need.
450                         if 'uuid' not in new_collection_record:
451                             new_collection_record['uuid'] = self.collection_locator
452                         if "portable_data_hash" not in new_collection_record:
453                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
454                         if 'manifest_text' not in new_collection_record:
455                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
456
457                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
458                             self.new_collection(new_collection_record, coll_reader)
459
460                         self._manifest_size = len(coll_reader.manifest_text())
461                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
462                 # end with llfuse.lock_released, re-acquire lock
463
464                 self.fresh()
465                 return True
466             finally:
467                 self._updating_lock.release()
468         except arvados.errors.NotFoundError as e:
469             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
470         except arvados.errors.ArgumentError as detail:
471             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
472             if self.collection_record is not None and "manifest_text" in self.collection_record:
473                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
474         except Exception:
475             _logger.exception("arv-mount %s: error", self.collection_locator)
476             if self.collection_record is not None and "manifest_text" in self.collection_record:
477                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
478         self.invalidate()
479         return False
480
481     @use_counter
482     @check_update
483     def __getitem__(self, item):
484         if item == '.arvados#collection':
485             if self.collection_record_file is None:
486                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
487                 self.inodes.add_entry(self.collection_record_file)
488             return self.collection_record_file
489         else:
490             return super(CollectionDirectory, self).__getitem__(item)
491
492     def __contains__(self, k):
493         if k == '.arvados#collection':
494             return True
495         else:
496             return super(CollectionDirectory, self).__contains__(k)
497
498     def invalidate(self):
499         self.collection_record = None
500         self.collection_record_file = None
501         super(CollectionDirectory, self).invalidate()
502
503     def persisted(self):
504         return (self.collection_locator is not None)
505
506     def objsize(self):
507         # This is an empirically-derived heuristic to estimate the memory used
508         # to store this collection's metadata.  Calculating the memory
509         # footprint directly would be more accurate, but also more complicated.
510         return self._manifest_size * 128
511
512     def finalize(self):
513         if self.collection is not None:
514             if self.writable():
515                 self.collection.save()
516             self.collection.stop_threads()
517
518     def clear(self):
519         if self.collection is not None:
520             self.collection.stop_threads()
521         super(CollectionDirectory, self).clear()
522         self._manifest_size = 0
523
524
525 class TmpCollectionDirectory(CollectionDirectoryBase):
526     """A directory backed by an Arvados collection that never gets saved.
527
528     This supports using Keep as scratch space. A userspace program can
529     read the .arvados#collection file to get a current manifest in
530     order to save a snapshot of the scratch data or use it as a crunch
531     job output.
532     """
533
534     class UnsaveableCollection(arvados.collection.Collection):
535         def save(self):
536             pass
537         def save_new(self):
538             pass
539
540     def __init__(self, parent_inode, inodes, api_client, num_retries):
541         collection = self.UnsaveableCollection(
542             api_client=api_client,
543             keep_client=api_client.keep,
544             num_retries=num_retries)
545         super(TmpCollectionDirectory, self).__init__(
546             parent_inode, inodes, collection)
547         self.collection_record_file = None
548         self.populate(self.mtime())
549
550     def on_event(self, *args, **kwargs):
551         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
552         if self.collection_record_file:
553             with llfuse.lock:
554                 self.collection_record_file.invalidate()
555             self.inodes.invalidate_inode(self.collection_record_file)
556             _logger.debug("%s invalidated collection record", self)
557
558     def collection_record(self):
559         with llfuse.lock_released:
560             return {
561                 "uuid": None,
562                 "manifest_text": self.collection.manifest_text(),
563                 "portable_data_hash": self.collection.portable_data_hash(),
564             }
565
566     def __contains__(self, k):
567         return (k == '.arvados#collection' or
568                 super(TmpCollectionDirectory, self).__contains__(k))
569
570     @use_counter
571     def __getitem__(self, item):
572         if item == '.arvados#collection':
573             if self.collection_record_file is None:
574                 self.collection_record_file = FuncToJSONFile(
575                     self.inode, self.collection_record)
576                 self.inodes.add_entry(self.collection_record_file)
577             return self.collection_record_file
578         return super(TmpCollectionDirectory, self).__getitem__(item)
579
580     def persisted(self):
581         return False
582
583     def writable(self):
584         return True
585
586     def want_event_subscribe(self):
587         return False
588
589     def finalize(self):
590         self.collection.stop_threads()
591
592     def invalidate(self):
593         if self.collection_record_file:
594             self.collection_record_file.invalidate()
595         super(TmpCollectionDirectory, self).invalidate()
596
597
598 class MagicDirectory(Directory):
599     """A special directory that logically contains the set of all extant keep locators.
600
601     When a file is referenced by lookup(), it is tested to see if it is a valid
602     keep locator to a manifest, and if so, loads the manifest contents as a
603     subdirectory of this directory with the locator as the directory name.
604     Since querying a list of all extant keep locators is impractical, only
605     collections that have already been accessed are visible to readdir().
606
607     """
608
609     README_TEXT = """
610 This directory provides access to Arvados collections as subdirectories listed
611 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
612 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
613 (in the form 'zzzzz-j7d0g-1234567890abcde').
614
615 Note that this directory will appear empty until you attempt to access a
616 specific collection or project subdirectory (such as trying to 'cd' into it),
617 at which point the collection or project will actually be looked up on the server
618 and the directory will appear if it exists.
619
620 """.lstrip()
621
622     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
623         super(MagicDirectory, self).__init__(parent_inode, inodes)
624         self.api = api
625         self.num_retries = num_retries
626         self.pdh_only = pdh_only
627
628     def __setattr__(self, name, value):
629         super(MagicDirectory, self).__setattr__(name, value)
630         # When we're assigned an inode, add a README.
631         if ((name == 'inode') and (self.inode is not None) and
632               (not self._entries)):
633             self._entries['README'] = self.inodes.add_entry(
634                 StringFile(self.inode, self.README_TEXT, time.time()))
635             # If we're the root directory, add an identical by_id subdirectory.
636             if self.inode == llfuse.ROOT_INODE:
637                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
638                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
639
640     def __contains__(self, k):
641         if k in self._entries:
642             return True
643
644         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
645             return False
646
647         try:
648             e = None
649
650             if group_uuid_pattern.match(k):
651                 project = self.api.groups().list(
652                     filters=[['group_class', '=', 'project'], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
653                 if project[u'items_available'] == 0:
654                     return False
655                 e = self.inodes.add_entry(ProjectDirectory(
656                     self.inode, self.inodes, self.api, self.num_retries, project[u'items'][0]))
657             else:
658                 e = self.inodes.add_entry(CollectionDirectory(
659                         self.inode, self.inodes, self.api, self.num_retries, k))
660
661             if e.update():
662                 if k not in self._entries:
663                     self._entries[k] = e
664                 else:
665                     self.inodes.del_entry(e)
666                 return True
667             else:
668                 self.inodes.invalidate_entry(self, k)
669                 self.inodes.del_entry(e)
670                 return False
671         except Exception as ex:
672             _logger.exception("arv-mount lookup '%s':", k)
673             if e is not None:
674                 self.inodes.del_entry(e)
675             return False
676
677     def __getitem__(self, item):
678         if item in self:
679             return self._entries[item]
680         else:
681             raise KeyError("No collection with id " + item)
682
683     def clear(self):
684         pass
685
686     def want_event_subscribe(self):
687         return not self.pdh_only
688
689
690 class TagsDirectory(Directory):
691     """A special directory that contains as subdirectories all tags visible to the user."""
692
693     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
694         super(TagsDirectory, self).__init__(parent_inode, inodes)
695         self.api = api
696         self.num_retries = num_retries
697         self._poll = True
698         self._poll_time = poll_time
699         self._extra = set()
700
701     def want_event_subscribe(self):
702         return True
703
704     @use_counter
705     def update(self):
706         with llfuse.lock_released:
707             tags = self.api.links().list(
708                 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
709                 select=['name'], distinct=True, limit=1000
710                 ).execute(num_retries=self.num_retries)
711         if "items" in tags:
712             self.merge(tags['items']+[{"name": n} for n in self._extra],
713                        lambda i: i['name'],
714                        lambda a, i: a.tag == i['name'],
715                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
716
717     @use_counter
718     @check_update
719     def __getitem__(self, item):
720         if super(TagsDirectory, self).__contains__(item):
721             return super(TagsDirectory, self).__getitem__(item)
722         with llfuse.lock_released:
723             tags = self.api.links().list(
724                 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
725             ).execute(num_retries=self.num_retries)
726         if tags["items"]:
727             self._extra.add(item)
728             self.update()
729         return super(TagsDirectory, self).__getitem__(item)
730
731     @use_counter
732     @check_update
733     def __contains__(self, k):
734         if super(TagsDirectory, self).__contains__(k):
735             return True
736         try:
737             self[k]
738             return True
739         except KeyError:
740             pass
741         return False
742
743
744 class TagDirectory(Directory):
745     """A special directory that contains as subdirectories all collections visible
746     to the user that are tagged with a particular tag.
747     """
748
749     def __init__(self, parent_inode, inodes, api, num_retries, tag,
750                  poll=False, poll_time=60):
751         super(TagDirectory, self).__init__(parent_inode, inodes)
752         self.api = api
753         self.num_retries = num_retries
754         self.tag = tag
755         self._poll = poll
756         self._poll_time = poll_time
757
758     def want_event_subscribe(self):
759         return True
760
761     @use_counter
762     def update(self):
763         with llfuse.lock_released:
764             taggedcollections = self.api.links().list(
765                 filters=[['link_class', '=', 'tag'],
766                          ['name', '=', self.tag],
767                          ['head_uuid', 'is_a', 'arvados#collection']],
768                 select=['head_uuid']
769                 ).execute(num_retries=self.num_retries)
770         self.merge(taggedcollections['items'],
771                    lambda i: i['head_uuid'],
772                    lambda a, i: a.collection_locator == i['head_uuid'],
773                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
774
775
776 class ProjectDirectory(Directory):
777     """A special directory that contains the contents of a project."""
778
779     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
780                  poll=False, poll_time=60):
781         super(ProjectDirectory, self).__init__(parent_inode, inodes)
782         self.api = api
783         self.num_retries = num_retries
784         self.project_object = project_object
785         self.project_object_file = None
786         self.project_uuid = project_object['uuid']
787         self._poll = poll
788         self._poll_time = poll_time
789         self._updating_lock = threading.Lock()
790         self._current_user = None
791         self._full_listing = False
792
793     def want_event_subscribe(self):
794         return True
795
796     def createDirectory(self, i):
797         if collection_uuid_pattern.match(i['uuid']):
798             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
799         elif group_uuid_pattern.match(i['uuid']):
800             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
801         elif link_uuid_pattern.match(i['uuid']):
802             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
803                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
804             else:
805                 return None
806         elif uuid_pattern.match(i['uuid']):
807             return ObjectFile(self.parent_inode, i)
808         else:
809             return None
810
811     def uuid(self):
812         return self.project_uuid
813
814     def items(self):
815         self._full_listing = True
816         return super(ProjectDirectory, self).items()
817
818     def namefn(self, i):
819         if 'name' in i:
820             if i['name'] is None or len(i['name']) == 0:
821                 return None
822             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
823                 # collection or subproject
824                 return i['name']
825             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
826                 # name link
827                 return i['name']
828             elif 'kind' in i and i['kind'].startswith('arvados#'):
829                 # something else
830                 return "{}.{}".format(i['name'], i['kind'][8:])
831         else:
832             return None
833
834
835     @use_counter
836     def update(self):
837         if self.project_object_file == None:
838             self.project_object_file = ObjectFile(self.inode, self.project_object)
839             self.inodes.add_entry(self.project_object_file)
840
841         if not self._full_listing:
842             return True
843
844         def samefn(a, i):
845             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
846                 return a.uuid() == i['uuid']
847             elif isinstance(a, ObjectFile):
848                 return a.uuid() == i['uuid'] and not a.stale()
849             return False
850
851         try:
852             with llfuse.lock_released:
853                 self._updating_lock.acquire()
854                 if not self.stale():
855                     return
856
857                 if group_uuid_pattern.match(self.project_uuid):
858                     self.project_object = self.api.groups().get(
859                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
860                 elif user_uuid_pattern.match(self.project_uuid):
861                     self.project_object = self.api.users().get(
862                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
863
864                 contents = arvados.util.list_all(self.api.groups().list,
865                                                  self.num_retries,
866                                                  filters=[["owner_uuid", "=", self.project_uuid],
867                                                           ["group_class", "=", "project"]])
868                 contents.extend(arvados.util.list_all(self.api.collections().list,
869                                                       self.num_retries,
870                                                       filters=[["owner_uuid", "=", self.project_uuid]]))
871
872             # end with llfuse.lock_released, re-acquire lock
873
874             self.merge(contents,
875                        self.namefn,
876                        samefn,
877                        self.createDirectory)
878             return True
879         finally:
880             self._updating_lock.release()
881
882     def _add_entry(self, i, name):
883         ent = self.createDirectory(i)
884         self._entries[name] = self.inodes.add_entry(ent)
885         return self._entries[name]
886
887     @use_counter
888     @check_update
889     def __getitem__(self, k):
890         if k == '.arvados#project':
891             return self.project_object_file
892         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
893             return super(ProjectDirectory, self).__getitem__(k)
894         with llfuse.lock_released:
895             contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
896                                                        ["group_class", "=", "project"],
897                                                        ["name", "=", k]],
898                                               limit=1).execute(num_retries=self.num_retries)["items"]
899             if not contents:
900                 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
901                                                                 ["name", "=", k]],
902                                                        limit=1).execute(num_retries=self.num_retries)["items"]
903         if contents:
904             name = sanitize_filename(self.namefn(contents[0]))
905             if name != k:
906                 raise KeyError(k)
907             return self._add_entry(contents[0], name)
908
909         # Didn't find item
910         raise KeyError(k)
911
912     def __contains__(self, k):
913         if k == '.arvados#project':
914             return True
915         try:
916             self[k]
917             return True
918         except KeyError:
919             pass
920         return False
921
922     @use_counter
923     @check_update
924     def writable(self):
925         with llfuse.lock_released:
926             if not self._current_user:
927                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
928             return self._current_user["uuid"] in self.project_object["writable_by"]
929
930     def persisted(self):
931         return True
932
933     @use_counter
934     @check_update
935     def mkdir(self, name):
936         try:
937             with llfuse.lock_released:
938                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
939                                                     "name": name,
940                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
941             self.invalidate()
942         except apiclient_errors.Error as error:
943             _logger.error(error)
944             raise llfuse.FUSEError(errno.EEXIST)
945
946     @use_counter
947     @check_update
948     def rmdir(self, name):
949         if name not in self:
950             raise llfuse.FUSEError(errno.ENOENT)
951         if not isinstance(self[name], CollectionDirectory):
952             raise llfuse.FUSEError(errno.EPERM)
953         if len(self[name]) > 0:
954             raise llfuse.FUSEError(errno.ENOTEMPTY)
955         with llfuse.lock_released:
956             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
957         self.invalidate()
958
959     @use_counter
960     @check_update
961     def rename(self, name_old, name_new, src):
962         if not isinstance(src, ProjectDirectory):
963             raise llfuse.FUSEError(errno.EPERM)
964
965         ent = src[name_old]
966
967         if not isinstance(ent, CollectionDirectory):
968             raise llfuse.FUSEError(errno.EPERM)
969
970         if name_new in self:
971             # POSIX semantics for replacing one directory with another is
972             # tricky (the target directory must be empty, the operation must be
973             # atomic which isn't possible with the Arvados API as of this
974             # writing) so don't support that.
975             raise llfuse.FUSEError(errno.EPERM)
976
977         self.api.collections().update(uuid=ent.uuid(),
978                                       body={"owner_uuid": self.uuid(),
979                                             "name": name_new}).execute(num_retries=self.num_retries)
980
981         # Acually move the entry from source directory to this directory.
982         del src._entries[name_old]
983         self._entries[name_new] = ent
984         self.inodes.invalidate_entry(src, name_old)
985
986     @use_counter
987     def child_event(self, ev):
988         properties = ev.get("properties") or {}
989         old_attrs = properties.get("old_attributes") or {}
990         new_attrs = properties.get("new_attributes") or {}
991         old_attrs["uuid"] = ev["object_uuid"]
992         new_attrs["uuid"] = ev["object_uuid"]
993         old_name = sanitize_filename(self.namefn(old_attrs))
994         new_name = sanitize_filename(self.namefn(new_attrs))
995
996         # create events will have a new name, but not an old name
997         # delete events will have an old name, but not a new name
998         # update events will have an old and new name, and they may be same or different
999         # if they are the same, an unrelated field changed and there is nothing to do.
1000
1001         if old_attrs.get("owner_uuid") != self.project_uuid:
1002             # Was moved from somewhere else, so don't try to remove entry.
1003             old_name = None
1004         if ev.get("object_owner_uuid") != self.project_uuid:
1005             # Was moved to somewhere else, so don't try to add entry
1006             new_name = None
1007
1008         if old_attrs.get("is_trashed"):
1009             # Was previously deleted
1010             old_name = None
1011         if new_attrs.get("is_trashed"):
1012             # Has been deleted
1013             new_name = None
1014
1015         if new_name != old_name:
1016             ent = None
1017             if old_name in self._entries:
1018                 ent = self._entries[old_name]
1019                 del self._entries[old_name]
1020                 self.inodes.invalidate_entry(self, old_name)
1021
1022             if new_name:
1023                 if ent is not None:
1024                     self._entries[new_name] = ent
1025                 else:
1026                     self._add_entry(new_attrs, new_name)
1027             elif ent is not None:
1028                 self.inodes.del_entry(ent)
1029
1030
1031 class SharedDirectory(Directory):
1032     """A special directory that represents users or groups who have shared projects with me."""
1033
1034     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
1035                  poll=False, poll_time=60):
1036         super(SharedDirectory, self).__init__(parent_inode, inodes)
1037         self.api = api
1038         self.num_retries = num_retries
1039         self.current_user = api.users().current().execute(num_retries=num_retries)
1040         self._poll = True
1041         self._poll_time = poll_time
1042         self._updating_lock = threading.Lock()
1043
1044     @use_counter
1045     def update(self):
1046         try:
1047             with llfuse.lock_released:
1048                 self._updating_lock.acquire()
1049                 if not self.stale():
1050                     return
1051
1052                 contents = {}
1053                 roots = []
1054                 root_owners = set()
1055                 objects = {}
1056
1057                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1058                 if 'httpMethod' in methods.get('shared', {}):
1059                     page = []
1060                     while True:
1061                         resp = self.api.groups().shared(filters=[['group_class', '=', 'project']]+page,
1062                                                        select=["uuid", "owner_uuid"],
1063                                                        order="uuid",
1064                                                        limit=10000,
1065                                                        count="none").execute()
1066                         if not resp["items"]:
1067                             break
1068                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1069                         for r in resp["items"]:
1070                             objects[r["uuid"]] = r
1071                             roots.append(r["uuid"])
1072                         for r in resp["include"]:
1073                             objects[r["uuid"]] = r
1074                             root_owners.add(r["uuid"])
1075                 else:
1076                     all_projects = arvados.util.list_all(
1077                         self.api.groups().list, self.num_retries,
1078                         filters=[['group_class','=','project']],
1079                         select=["uuid", "owner_uuid"])
1080                     for ob in all_projects:
1081                         objects[ob['uuid']] = ob
1082
1083                     current_uuid = self.current_user['uuid']
1084                     for ob in all_projects:
1085                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1086                             roots.append(ob['uuid'])
1087                             root_owners.add(ob['owner_uuid'])
1088
1089                     lusers = arvados.util.list_all(
1090                         self.api.users().list, self.num_retries,
1091                         filters=[['uuid','in', list(root_owners)]])
1092                     lgroups = arvados.util.list_all(
1093                         self.api.groups().list, self.num_retries,
1094                         filters=[['uuid','in', list(root_owners)+roots]])
1095
1096                     for l in lusers:
1097                         objects[l["uuid"]] = l
1098                     for l in lgroups:
1099                         objects[l["uuid"]] = l
1100
1101                 for r in root_owners:
1102                     if r in objects:
1103                         obr = objects[r]
1104                         if obr.get("name"):
1105                             contents[obr["name"]] = obr
1106                         #elif obr.get("username"):
1107                         #    contents[obr["username"]] = obr
1108                         elif "first_name" in obr:
1109                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1110
1111                 for r in roots:
1112                     if r in objects:
1113                         obr = objects[r]
1114                         if obr['owner_uuid'] not in objects:
1115                             contents[obr["name"]] = obr
1116
1117             # end with llfuse.lock_released, re-acquire lock
1118
1119             self.merge(contents.items(),
1120                        lambda i: i[0],
1121                        lambda a, i: a.uuid() == i[1]['uuid'],
1122                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
1123         except Exception:
1124             _logger.exception("arv-mount shared dir error")
1125         finally:
1126             self._updating_lock.release()
1127
1128     def want_event_subscribe(self):
1129         return True