16100: Move test to integration suite to avoid logging errors.
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future.utils import viewitems
8 from future.utils import itervalues
9 from builtins import dict
10 import logging
11 import re
12 import time
13 import llfuse
14 import arvados
15 import apiclient
16 import functools
17 import threading
18 from apiclient import errors as apiclient_errors
19 import errno
20 import time
21
22 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
23 from .fresh import FreshBase, convertTime, use_counter, check_update
24
25 import arvados.collection
26 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
27
28 _logger = logging.getLogger('arvados.arvados_fuse')
29
30
31 # Match any character which FUSE or Linux cannot accommodate as part
32 # of a filename. (If present in a collection filename, they will
33 # appear as underscores in the fuse mount.)
34 _disallowed_filename_characters = re.compile('[\x00/]')
35
36 # '.' and '..' are not reachable if API server is newer than #6277
37 def sanitize_filename(dirty):
38     """Replace disallowed filename characters with harmless "_"."""
39     if dirty is None:
40         return None
41     elif dirty == '':
42         return '_'
43     elif dirty == '.':
44         return '_'
45     elif dirty == '..':
46         return '__'
47     else:
48         return _disallowed_filename_characters.sub('_', dirty)
49
50
51 class Directory(FreshBase):
52     """Generic directory object, backed by a dict.
53
54     Consists of a set of entries with the key representing the filename
55     and the value referencing a File or Directory object.
56     """
57
58     def __init__(self, parent_inode, inodes):
59         """parent_inode is the integer inode number"""
60
61         super(Directory, self).__init__()
62
63         self.inode = None
64         if not isinstance(parent_inode, int):
65             raise Exception("parent_inode should be an int")
66         self.parent_inode = parent_inode
67         self.inodes = inodes
68         self._entries = {}
69         self._mtime = time.time()
70
71     #  Overriden by subclasses to implement logic to update the entries dict
72     #  when the directory is stale
73     @use_counter
74     def update(self):
75         pass
76
77     # Only used when computing the size of the disk footprint of the directory
78     # (stub)
79     def size(self):
80         return 0
81
82     def persisted(self):
83         return False
84
85     def checkupdate(self):
86         if self.stale():
87             try:
88                 self.update()
89             except apiclient.errors.HttpError as e:
90                 _logger.warn(e)
91
92     @use_counter
93     @check_update
94     def __getitem__(self, item):
95         return self._entries[item]
96
97     @use_counter
98     @check_update
99     def items(self):
100         return list(self._entries.items())
101
102     @use_counter
103     @check_update
104     def __contains__(self, k):
105         return k in self._entries
106
107     @use_counter
108     @check_update
109     def __len__(self):
110         return len(self._entries)
111
112     def fresh(self):
113         self.inodes.touch(self)
114         super(Directory, self).fresh()
115
116     def merge(self, items, fn, same, new_entry):
117         """Helper method for updating the contents of the directory.
118
119         Takes a list describing the new contents of the directory, reuse
120         entries that are the same in both the old and new lists, create new
121         entries, and delete old entries missing from the new list.
122
123         :items: iterable with new directory contents
124
125         :fn: function to take an entry in 'items' and return the desired file or
126         directory name, or None if this entry should be skipped
127
128         :same: function to compare an existing entry (a File or Directory
129         object) with an entry in the items list to determine whether to keep
130         the existing entry.
131
132         :new_entry: function to create a new directory entry (File or Directory
133         object) from an entry in the items list.
134
135         """
136
137         oldentries = self._entries
138         self._entries = {}
139         changed = False
140         for i in items:
141             name = sanitize_filename(fn(i))
142             if name:
143                 if name in oldentries and same(oldentries[name], i):
144                     # move existing directory entry over
145                     self._entries[name] = oldentries[name]
146                     del oldentries[name]
147                 else:
148                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
149                     # create new directory entry
150                     ent = new_entry(i)
151                     if ent is not None:
152                         self._entries[name] = self.inodes.add_entry(ent)
153                         changed = True
154
155         # delete any other directory entries that were not in found in 'items'
156         for i in oldentries:
157             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
158             self.inodes.invalidate_entry(self, i)
159             self.inodes.del_entry(oldentries[i])
160             changed = True
161
162         if changed:
163             self.inodes.invalidate_inode(self)
164             self._mtime = time.time()
165
166         self.fresh()
167
168     def in_use(self):
169         if super(Directory, self).in_use():
170             return True
171         for v in itervalues(self._entries):
172             if v.in_use():
173                 return True
174         return False
175
176     def has_ref(self, only_children):
177         if super(Directory, self).has_ref(only_children):
178             return True
179         for v in itervalues(self._entries):
180             if v.has_ref(False):
181                 return True
182         return False
183
184     def clear(self):
185         """Delete all entries"""
186         oldentries = self._entries
187         self._entries = {}
188         for n in oldentries:
189             oldentries[n].clear()
190             self.inodes.del_entry(oldentries[n])
191         self.invalidate()
192
193     def kernel_invalidate(self):
194         # Invalidating the dentry on the parent implies invalidating all paths
195         # below it as well.
196         parent = self.inodes[self.parent_inode]
197
198         # Find self on the parent in order to invalidate this path.
199         # Calling the public items() method might trigger a refresh,
200         # which we definitely don't want, so read the internal dict directly.
201         for k,v in viewitems(parent._entries):
202             if v is self:
203                 self.inodes.invalidate_entry(parent, k)
204                 break
205
206     def mtime(self):
207         return self._mtime
208
209     def writable(self):
210         return False
211
212     def flush(self):
213         pass
214
215     def want_event_subscribe(self):
216         raise NotImplementedError()
217
218     def create(self, name):
219         raise NotImplementedError()
220
221     def mkdir(self, name):
222         raise NotImplementedError()
223
224     def unlink(self, name):
225         raise NotImplementedError()
226
227     def rmdir(self, name):
228         raise NotImplementedError()
229
230     def rename(self, name_old, name_new, src):
231         raise NotImplementedError()
232
233
234 class CollectionDirectoryBase(Directory):
235     """Represent an Arvados Collection as a directory.
236
237     This class is used for Subcollections, and is also the base class for
238     CollectionDirectory, which implements collection loading/saving on
239     Collection records.
240
241     Most operations act only the underlying Arvados `Collection` object.  The
242     `Collection` object signals via a notify callback to
243     `CollectionDirectoryBase.on_event` that an item was added, removed or
244     modified.  FUSE inodes and directory entries are created, deleted or
245     invalidated in response to these events.
246
247     """
248
249     def __init__(self, parent_inode, inodes, collection):
250         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
251         self.collection = collection
252
253     def new_entry(self, name, item, mtime):
254         name = sanitize_filename(name)
255         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
256             if item.fuse_entry.dead is not True:
257                 raise Exception("Can only reparent dead inode entry")
258             if item.fuse_entry.inode is None:
259                 raise Exception("Reparented entry must still have valid inode")
260             item.fuse_entry.dead = False
261             self._entries[name] = item.fuse_entry
262         elif isinstance(item, arvados.collection.RichCollectionBase):
263             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
264             self._entries[name].populate(mtime)
265         else:
266             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
267         item.fuse_entry = self._entries[name]
268
269     def on_event(self, event, collection, name, item):
270         if collection == self.collection:
271             name = sanitize_filename(name)
272             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
273             with llfuse.lock:
274                 if event == arvados.collection.ADD:
275                     self.new_entry(name, item, self.mtime())
276                 elif event == arvados.collection.DEL:
277                     ent = self._entries[name]
278                     del self._entries[name]
279                     self.inodes.invalidate_entry(self, name)
280                     self.inodes.del_entry(ent)
281                 elif event == arvados.collection.MOD:
282                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
283                         self.inodes.invalidate_inode(item.fuse_entry)
284                     elif name in self._entries:
285                         self.inodes.invalidate_inode(self._entries[name])
286
287     def populate(self, mtime):
288         self._mtime = mtime
289         self.collection.subscribe(self.on_event)
290         for entry, item in viewitems(self.collection):
291             self.new_entry(entry, item, self.mtime())
292
293     def writable(self):
294         return self.collection.writable()
295
296     @use_counter
297     def flush(self):
298         with llfuse.lock_released:
299             self.collection.root_collection().save()
300
301     @use_counter
302     @check_update
303     def create(self, name):
304         with llfuse.lock_released:
305             self.collection.open(name, "w").close()
306
307     @use_counter
308     @check_update
309     def mkdir(self, name):
310         with llfuse.lock_released:
311             self.collection.mkdirs(name)
312
313     @use_counter
314     @check_update
315     def unlink(self, name):
316         with llfuse.lock_released:
317             self.collection.remove(name)
318         self.flush()
319
320     @use_counter
321     @check_update
322     def rmdir(self, name):
323         with llfuse.lock_released:
324             self.collection.remove(name)
325         self.flush()
326
327     @use_counter
328     @check_update
329     def rename(self, name_old, name_new, src):
330         if not isinstance(src, CollectionDirectoryBase):
331             raise llfuse.FUSEError(errno.EPERM)
332
333         if name_new in self:
334             ent = src[name_old]
335             tgt = self[name_new]
336             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
337                 pass
338             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
339                 if len(tgt) > 0:
340                     raise llfuse.FUSEError(errno.ENOTEMPTY)
341             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
342                 raise llfuse.FUSEError(errno.ENOTDIR)
343             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
344                 raise llfuse.FUSEError(errno.EISDIR)
345
346         with llfuse.lock_released:
347             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
348         self.flush()
349         src.flush()
350
351     def clear(self):
352         super(CollectionDirectoryBase, self).clear()
353         self.collection = None
354
355
356 class CollectionDirectory(CollectionDirectoryBase):
357     """Represents the root of a directory tree representing a collection."""
358
359     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
360         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
361         self.api = api
362         self.num_retries = num_retries
363         self.collection_record_file = None
364         self.collection_record = None
365         self._poll = True
366         try:
367             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
368         except:
369             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
370             self._poll_time = 60*60
371
372         if isinstance(collection_record, dict):
373             self.collection_locator = collection_record['uuid']
374             self._mtime = convertTime(collection_record.get('modified_at'))
375         else:
376             self.collection_locator = collection_record
377             self._mtime = 0
378         self._manifest_size = 0
379         if self.collection_locator:
380             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
381         self._updating_lock = threading.Lock()
382
383     def same(self, i):
384         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
385
386     def writable(self):
387         return self.collection.writable() if self.collection is not None else self._writable
388
389     def want_event_subscribe(self):
390         return (uuid_pattern.match(self.collection_locator) is not None)
391
392     # Used by arv-web.py to switch the contents of the CollectionDirectory
393     def change_collection(self, new_locator):
394         """Switch the contents of the CollectionDirectory.
395
396         Must be called with llfuse.lock held.
397         """
398
399         self.collection_locator = new_locator
400         self.collection_record = None
401         self.update()
402
403     def new_collection(self, new_collection_record, coll_reader):
404         if self.inode:
405             self.clear()
406
407         self.collection_record = new_collection_record
408
409         if self.collection_record:
410             self._mtime = convertTime(self.collection_record.get('modified_at'))
411             self.collection_locator = self.collection_record["uuid"]
412             if self.collection_record_file is not None:
413                 self.collection_record_file.update(self.collection_record)
414
415         self.collection = coll_reader
416         self.populate(self.mtime())
417
418     def uuid(self):
419         return self.collection_locator
420
421     @use_counter
422     def update(self, to_record_version=None):
423         try:
424             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
425                 return True
426
427             if self.collection_locator is None:
428                 self.fresh()
429                 return True
430
431             try:
432                 with llfuse.lock_released:
433                     self._updating_lock.acquire()
434                     if not self.stale():
435                         return
436
437                     _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
438                     if self.collection is not None:
439                         if self.collection.known_past_version(to_record_version):
440                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
441                         else:
442                             self.collection.update()
443                     else:
444                         if uuid_pattern.match(self.collection_locator):
445                             coll_reader = arvados.collection.Collection(
446                                 self.collection_locator, self.api, self.api.keep,
447                                 num_retries=self.num_retries)
448                         else:
449                             coll_reader = arvados.collection.CollectionReader(
450                                 self.collection_locator, self.api, self.api.keep,
451                                 num_retries=self.num_retries)
452                         new_collection_record = coll_reader.api_response() or {}
453                         # If the Collection only exists in Keep, there will be no API
454                         # response.  Fill in the fields we need.
455                         if 'uuid' not in new_collection_record:
456                             new_collection_record['uuid'] = self.collection_locator
457                         if "portable_data_hash" not in new_collection_record:
458                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
459                         if 'manifest_text' not in new_collection_record:
460                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
461
462                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
463                             self.new_collection(new_collection_record, coll_reader)
464
465                         self._manifest_size = len(coll_reader.manifest_text())
466                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
467                 # end with llfuse.lock_released, re-acquire lock
468
469                 self.fresh()
470                 return True
471             finally:
472                 self._updating_lock.release()
473         except arvados.errors.NotFoundError as e:
474             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
475         except arvados.errors.ArgumentError as detail:
476             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
477             if self.collection_record is not None and "manifest_text" in self.collection_record:
478                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
479         except Exception:
480             _logger.exception("arv-mount %s: error", self.collection_locator)
481             if self.collection_record is not None and "manifest_text" in self.collection_record:
482                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
483         self.invalidate()
484         return False
485
486     @use_counter
487     @check_update
488     def __getitem__(self, item):
489         if item == '.arvados#collection':
490             if self.collection_record_file is None:
491                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
492                 self.inodes.add_entry(self.collection_record_file)
493             return self.collection_record_file
494         else:
495             return super(CollectionDirectory, self).__getitem__(item)
496
497     def __contains__(self, k):
498         if k == '.arvados#collection':
499             return True
500         else:
501             return super(CollectionDirectory, self).__contains__(k)
502
503     def invalidate(self):
504         self.collection_record = None
505         self.collection_record_file = None
506         super(CollectionDirectory, self).invalidate()
507
508     def persisted(self):
509         return (self.collection_locator is not None)
510
511     def objsize(self):
512         # This is an empirically-derived heuristic to estimate the memory used
513         # to store this collection's metadata.  Calculating the memory
514         # footprint directly would be more accurate, but also more complicated.
515         return self._manifest_size * 128
516
517     def finalize(self):
518         if self.collection is not None:
519             if self.writable():
520                 self.collection.save()
521             self.collection.stop_threads()
522
523     def clear(self):
524         if self.collection is not None:
525             self.collection.stop_threads()
526         super(CollectionDirectory, self).clear()
527         self._manifest_size = 0
528
529
530 class TmpCollectionDirectory(CollectionDirectoryBase):
531     """A directory backed by an Arvados collection that never gets saved.
532
533     This supports using Keep as scratch space. A userspace program can
534     read the .arvados#collection file to get a current manifest in
535     order to save a snapshot of the scratch data or use it as a crunch
536     job output.
537     """
538
539     class UnsaveableCollection(arvados.collection.Collection):
540         def save(self):
541             pass
542         def save_new(self):
543             pass
544
545     def __init__(self, parent_inode, inodes, api_client, num_retries):
546         collection = self.UnsaveableCollection(
547             api_client=api_client,
548             keep_client=api_client.keep,
549             num_retries=num_retries)
550         super(TmpCollectionDirectory, self).__init__(
551             parent_inode, inodes, collection)
552         self.collection_record_file = None
553         self.populate(self.mtime())
554
555     def on_event(self, *args, **kwargs):
556         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
557         if self.collection_record_file:
558             with llfuse.lock:
559                 self.collection_record_file.invalidate()
560             self.inodes.invalidate_inode(self.collection_record_file)
561             _logger.debug("%s invalidated collection record", self)
562
563     def collection_record(self):
564         with llfuse.lock_released:
565             return {
566                 "uuid": None,
567                 "manifest_text": self.collection.manifest_text(),
568                 "portable_data_hash": self.collection.portable_data_hash(),
569             }
570
571     def __contains__(self, k):
572         return (k == '.arvados#collection' or
573                 super(TmpCollectionDirectory, self).__contains__(k))
574
575     @use_counter
576     def __getitem__(self, item):
577         if item == '.arvados#collection':
578             if self.collection_record_file is None:
579                 self.collection_record_file = FuncToJSONFile(
580                     self.inode, self.collection_record)
581                 self.inodes.add_entry(self.collection_record_file)
582             return self.collection_record_file
583         return super(TmpCollectionDirectory, self).__getitem__(item)
584
585     def persisted(self):
586         return False
587
588     def writable(self):
589         return True
590
591     def want_event_subscribe(self):
592         return False
593
594     def finalize(self):
595         self.collection.stop_threads()
596
597     def invalidate(self):
598         if self.collection_record_file:
599             self.collection_record_file.invalidate()
600         super(TmpCollectionDirectory, self).invalidate()
601
602
603 class MagicDirectory(Directory):
604     """A special directory that logically contains the set of all extant keep locators.
605
606     When a file is referenced by lookup(), it is tested to see if it is a valid
607     keep locator to a manifest, and if so, loads the manifest contents as a
608     subdirectory of this directory with the locator as the directory name.
609     Since querying a list of all extant keep locators is impractical, only
610     collections that have already been accessed are visible to readdir().
611
612     """
613
614     README_TEXT = """
615 This directory provides access to Arvados collections as subdirectories listed
616 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
617 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
618 (in the form 'zzzzz-j7d0g-1234567890abcde').
619
620 Note that this directory will appear empty until you attempt to access a
621 specific collection or project subdirectory (such as trying to 'cd' into it),
622 at which point the collection or project will actually be looked up on the server
623 and the directory will appear if it exists.
624
625 """.lstrip()
626
627     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
628         super(MagicDirectory, self).__init__(parent_inode, inodes)
629         self.api = api
630         self.num_retries = num_retries
631         self.pdh_only = pdh_only
632
633     def __setattr__(self, name, value):
634         super(MagicDirectory, self).__setattr__(name, value)
635         # When we're assigned an inode, add a README.
636         if ((name == 'inode') and (self.inode is not None) and
637               (not self._entries)):
638             self._entries['README'] = self.inodes.add_entry(
639                 StringFile(self.inode, self.README_TEXT, time.time()))
640             # If we're the root directory, add an identical by_id subdirectory.
641             if self.inode == llfuse.ROOT_INODE:
642                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
643                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
644
645     def __contains__(self, k):
646         if k in self._entries:
647             return True
648
649         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
650             return False
651
652         try:
653             e = None
654
655             if group_uuid_pattern.match(k):
656                 project = self.api.groups().list(
657                     filters=[['group_class', '=', 'project'], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
658                 if project[u'items_available'] == 0:
659                     return False
660                 e = self.inodes.add_entry(ProjectDirectory(
661                     self.inode, self.inodes, self.api, self.num_retries, project[u'items'][0]))
662             else:
663                 e = self.inodes.add_entry(CollectionDirectory(
664                         self.inode, self.inodes, self.api, self.num_retries, k))
665
666             if e.update():
667                 if k not in self._entries:
668                     self._entries[k] = e
669                 else:
670                     self.inodes.del_entry(e)
671                 return True
672             else:
673                 self.inodes.invalidate_entry(self, k)
674                 self.inodes.del_entry(e)
675                 return False
676         except Exception as ex:
677             _logger.exception("arv-mount lookup '%s':", k)
678             if e is not None:
679                 self.inodes.del_entry(e)
680             return False
681
682     def __getitem__(self, item):
683         if item in self:
684             return self._entries[item]
685         else:
686             raise KeyError("No collection with id " + item)
687
688     def clear(self):
689         pass
690
691     def want_event_subscribe(self):
692         return not self.pdh_only
693
694
695 class TagsDirectory(Directory):
696     """A special directory that contains as subdirectories all tags visible to the user."""
697
698     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
699         super(TagsDirectory, self).__init__(parent_inode, inodes)
700         self.api = api
701         self.num_retries = num_retries
702         self._poll = True
703         self._poll_time = poll_time
704         self._extra = set()
705
706     def want_event_subscribe(self):
707         return True
708
709     @use_counter
710     def update(self):
711         with llfuse.lock_released:
712             tags = self.api.links().list(
713                 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
714                 select=['name'], distinct=True, limit=1000
715                 ).execute(num_retries=self.num_retries)
716         if "items" in tags:
717             self.merge(tags['items']+[{"name": n} for n in self._extra],
718                        lambda i: i['name'],
719                        lambda a, i: a.tag == i['name'],
720                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
721
722     @use_counter
723     @check_update
724     def __getitem__(self, item):
725         if super(TagsDirectory, self).__contains__(item):
726             return super(TagsDirectory, self).__getitem__(item)
727         with llfuse.lock_released:
728             tags = self.api.links().list(
729                 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
730             ).execute(num_retries=self.num_retries)
731         if tags["items"]:
732             self._extra.add(item)
733             self.update()
734         return super(TagsDirectory, self).__getitem__(item)
735
736     @use_counter
737     @check_update
738     def __contains__(self, k):
739         if super(TagsDirectory, self).__contains__(k):
740             return True
741         try:
742             self[k]
743             return True
744         except KeyError:
745             pass
746         return False
747
748
749 class TagDirectory(Directory):
750     """A special directory that contains as subdirectories all collections visible
751     to the user that are tagged with a particular tag.
752     """
753
754     def __init__(self, parent_inode, inodes, api, num_retries, tag,
755                  poll=False, poll_time=60):
756         super(TagDirectory, self).__init__(parent_inode, inodes)
757         self.api = api
758         self.num_retries = num_retries
759         self.tag = tag
760         self._poll = poll
761         self._poll_time = poll_time
762
763     def want_event_subscribe(self):
764         return True
765
766     @use_counter
767     def update(self):
768         with llfuse.lock_released:
769             taggedcollections = self.api.links().list(
770                 filters=[['link_class', '=', 'tag'],
771                          ['name', '=', self.tag],
772                          ['head_uuid', 'is_a', 'arvados#collection']],
773                 select=['head_uuid']
774                 ).execute(num_retries=self.num_retries)
775         self.merge(taggedcollections['items'],
776                    lambda i: i['head_uuid'],
777                    lambda a, i: a.collection_locator == i['head_uuid'],
778                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
779
780
781 class ProjectDirectory(Directory):
782     """A special directory that contains the contents of a project."""
783
784     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
785                  poll=False, poll_time=60):
786         super(ProjectDirectory, self).__init__(parent_inode, inodes)
787         self.api = api
788         self.num_retries = num_retries
789         self.project_object = project_object
790         self.project_object_file = None
791         self.project_uuid = project_object['uuid']
792         self._poll = poll
793         self._poll_time = poll_time
794         self._updating_lock = threading.Lock()
795         self._current_user = None
796         self._full_listing = False
797
798     def want_event_subscribe(self):
799         return True
800
801     def createDirectory(self, i):
802         if collection_uuid_pattern.match(i['uuid']):
803             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
804         elif group_uuid_pattern.match(i['uuid']):
805             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
806         elif link_uuid_pattern.match(i['uuid']):
807             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
808                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
809             else:
810                 return None
811         elif uuid_pattern.match(i['uuid']):
812             return ObjectFile(self.parent_inode, i)
813         else:
814             return None
815
816     def uuid(self):
817         return self.project_uuid
818
819     def items(self):
820         self._full_listing = True
821         return super(ProjectDirectory, self).items()
822
823     def namefn(self, i):
824         if 'name' in i:
825             if i['name'] is None or len(i['name']) == 0:
826                 return None
827             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
828                 # collection or subproject
829                 return i['name']
830             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
831                 # name link
832                 return i['name']
833             elif 'kind' in i and i['kind'].startswith('arvados#'):
834                 # something else
835                 return "{}.{}".format(i['name'], i['kind'][8:])
836         else:
837             return None
838
839
840     @use_counter
841     def update(self):
842         if self.project_object_file == None:
843             self.project_object_file = ObjectFile(self.inode, self.project_object)
844             self.inodes.add_entry(self.project_object_file)
845
846         if not self._full_listing:
847             return True
848
849         def samefn(a, i):
850             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
851                 return a.uuid() == i['uuid']
852             elif isinstance(a, ObjectFile):
853                 return a.uuid() == i['uuid'] and not a.stale()
854             return False
855
856         try:
857             with llfuse.lock_released:
858                 self._updating_lock.acquire()
859                 if not self.stale():
860                     return
861
862                 if group_uuid_pattern.match(self.project_uuid):
863                     self.project_object = self.api.groups().get(
864                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
865                 elif user_uuid_pattern.match(self.project_uuid):
866                     self.project_object = self.api.users().get(
867                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
868
869                 contents = arvados.util.list_all(self.api.groups().list,
870                                                  self.num_retries,
871                                                  filters=[["owner_uuid", "=", self.project_uuid],
872                                                           ["group_class", "=", "project"]])
873                 contents.extend(arvados.util.list_all(self.api.collections().list,
874                                                       self.num_retries,
875                                                       filters=[["owner_uuid", "=", self.project_uuid]]))
876
877             # end with llfuse.lock_released, re-acquire lock
878
879             self.merge(contents,
880                        self.namefn,
881                        samefn,
882                        self.createDirectory)
883             return True
884         finally:
885             self._updating_lock.release()
886
887     def _add_entry(self, i, name):
888         ent = self.createDirectory(i)
889         self._entries[name] = self.inodes.add_entry(ent)
890         return self._entries[name]
891
892     @use_counter
893     @check_update
894     def __getitem__(self, k):
895         if k == '.arvados#project':
896             return self.project_object_file
897         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
898             return super(ProjectDirectory, self).__getitem__(k)
899         with llfuse.lock_released:
900             contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
901                                                        ["group_class", "=", "project"],
902                                                        ["name", "=", k]],
903                                               limit=1).execute(num_retries=self.num_retries)["items"]
904             if not contents:
905                 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
906                                                                 ["name", "=", k]],
907                                                        limit=1).execute(num_retries=self.num_retries)["items"]
908         if contents:
909             name = sanitize_filename(self.namefn(contents[0]))
910             if name != k:
911                 raise KeyError(k)
912             return self._add_entry(contents[0], name)
913
914         # Didn't find item
915         raise KeyError(k)
916
917     def __contains__(self, k):
918         if k == '.arvados#project':
919             return True
920         try:
921             self[k]
922             return True
923         except KeyError:
924             pass
925         return False
926
927     @use_counter
928     @check_update
929     def writable(self):
930         with llfuse.lock_released:
931             if not self._current_user:
932                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
933             return self._current_user["uuid"] in self.project_object.get("writable_by", [])
934
935     def persisted(self):
936         return True
937
938     @use_counter
939     @check_update
940     def mkdir(self, name):
941         try:
942             with llfuse.lock_released:
943                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
944                                                     "name": name,
945                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
946             self.invalidate()
947         except apiclient_errors.Error as error:
948             _logger.error(error)
949             raise llfuse.FUSEError(errno.EEXIST)
950
951     @use_counter
952     @check_update
953     def rmdir(self, name):
954         if name not in self:
955             raise llfuse.FUSEError(errno.ENOENT)
956         if not isinstance(self[name], CollectionDirectory):
957             raise llfuse.FUSEError(errno.EPERM)
958         if len(self[name]) > 0:
959             raise llfuse.FUSEError(errno.ENOTEMPTY)
960         with llfuse.lock_released:
961             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
962         self.invalidate()
963
964     @use_counter
965     @check_update
966     def rename(self, name_old, name_new, src):
967         if not isinstance(src, ProjectDirectory):
968             raise llfuse.FUSEError(errno.EPERM)
969
970         ent = src[name_old]
971
972         if not isinstance(ent, CollectionDirectory):
973             raise llfuse.FUSEError(errno.EPERM)
974
975         if name_new in self:
976             # POSIX semantics for replacing one directory with another is
977             # tricky (the target directory must be empty, the operation must be
978             # atomic which isn't possible with the Arvados API as of this
979             # writing) so don't support that.
980             raise llfuse.FUSEError(errno.EPERM)
981
982         self.api.collections().update(uuid=ent.uuid(),
983                                       body={"owner_uuid": self.uuid(),
984                                             "name": name_new}).execute(num_retries=self.num_retries)
985
986         # Acually move the entry from source directory to this directory.
987         del src._entries[name_old]
988         self._entries[name_new] = ent
989         self.inodes.invalidate_entry(src, name_old)
990
991     @use_counter
992     def child_event(self, ev):
993         properties = ev.get("properties") or {}
994         old_attrs = properties.get("old_attributes") or {}
995         new_attrs = properties.get("new_attributes") or {}
996         old_attrs["uuid"] = ev["object_uuid"]
997         new_attrs["uuid"] = ev["object_uuid"]
998         old_name = sanitize_filename(self.namefn(old_attrs))
999         new_name = sanitize_filename(self.namefn(new_attrs))
1000
1001         # create events will have a new name, but not an old name
1002         # delete events will have an old name, but not a new name
1003         # update events will have an old and new name, and they may be same or different
1004         # if they are the same, an unrelated field changed and there is nothing to do.
1005
1006         if old_attrs.get("owner_uuid") != self.project_uuid:
1007             # Was moved from somewhere else, so don't try to remove entry.
1008             old_name = None
1009         if ev.get("object_owner_uuid") != self.project_uuid:
1010             # Was moved to somewhere else, so don't try to add entry
1011             new_name = None
1012
1013         if old_attrs.get("is_trashed"):
1014             # Was previously deleted
1015             old_name = None
1016         if new_attrs.get("is_trashed"):
1017             # Has been deleted
1018             new_name = None
1019
1020         if new_name != old_name:
1021             ent = None
1022             if old_name in self._entries:
1023                 ent = self._entries[old_name]
1024                 del self._entries[old_name]
1025                 self.inodes.invalidate_entry(self, old_name)
1026
1027             if new_name:
1028                 if ent is not None:
1029                     self._entries[new_name] = ent
1030                 else:
1031                     self._add_entry(new_attrs, new_name)
1032             elif ent is not None:
1033                 self.inodes.del_entry(ent)
1034
1035
1036 class SharedDirectory(Directory):
1037     """A special directory that represents users or groups who have shared projects with me."""
1038
1039     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
1040                  poll=False, poll_time=60):
1041         super(SharedDirectory, self).__init__(parent_inode, inodes)
1042         self.api = api
1043         self.num_retries = num_retries
1044         self.current_user = api.users().current().execute(num_retries=num_retries)
1045         self._poll = True
1046         self._poll_time = poll_time
1047         self._updating_lock = threading.Lock()
1048
1049     @use_counter
1050     def update(self):
1051         try:
1052             with llfuse.lock_released:
1053                 self._updating_lock.acquire()
1054                 if not self.stale():
1055                     return
1056
1057                 contents = {}
1058                 roots = []
1059                 root_owners = set()
1060                 objects = {}
1061
1062                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1063                 if 'httpMethod' in methods.get('shared', {}):
1064                     page = []
1065                     while True:
1066                         resp = self.api.groups().shared(filters=[['group_class', '=', 'project']]+page,
1067                                                         order="uuid",
1068                                                         limit=10000,
1069                                                         count="none",
1070                                                         include="owner_uuid").execute()
1071                         if not resp["items"]:
1072                             break
1073                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1074                         for r in resp["items"]:
1075                             objects[r["uuid"]] = r
1076                             roots.append(r["uuid"])
1077                         for r in resp["included"]:
1078                             objects[r["uuid"]] = r
1079                             root_owners.add(r["uuid"])
1080                 else:
1081                     all_projects = arvados.util.list_all(
1082                         self.api.groups().list, self.num_retries,
1083                         filters=[['group_class','=','project']],
1084                         select=["uuid", "owner_uuid"])
1085                     for ob in all_projects:
1086                         objects[ob['uuid']] = ob
1087
1088                     current_uuid = self.current_user['uuid']
1089                     for ob in all_projects:
1090                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1091                             roots.append(ob['uuid'])
1092                             root_owners.add(ob['owner_uuid'])
1093
1094                     lusers = arvados.util.list_all(
1095                         self.api.users().list, self.num_retries,
1096                         filters=[['uuid','in', list(root_owners)]])
1097                     lgroups = arvados.util.list_all(
1098                         self.api.groups().list, self.num_retries,
1099                         filters=[['uuid','in', list(root_owners)+roots]])
1100
1101                     for l in lusers:
1102                         objects[l["uuid"]] = l
1103                     for l in lgroups:
1104                         objects[l["uuid"]] = l
1105
1106                 for r in root_owners:
1107                     if r in objects:
1108                         obr = objects[r]
1109                         if obr.get("name"):
1110                             contents[obr["name"]] = obr
1111                         #elif obr.get("username"):
1112                         #    contents[obr["username"]] = obr
1113                         elif "first_name" in obr:
1114                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1115
1116                 for r in roots:
1117                     if r in objects:
1118                         obr = objects[r]
1119                         if obr['owner_uuid'] not in objects:
1120                             contents[obr["name"]] = obr
1121
1122             # end with llfuse.lock_released, re-acquire lock
1123
1124             self.merge(viewitems(contents),
1125                        lambda i: i[0],
1126                        lambda a, i: a.uuid() == i[1]['uuid'],
1127                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
1128         except Exception:
1129             _logger.exception("arv-mount shared dir error")
1130         finally:
1131             self._updating_lock.release()
1132
1133     def want_event_subscribe(self):
1134         return True