Merge branch '11843-arpi-transient-error'
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import logging
6 import re
7 import time
8 import llfuse
9 import arvados
10 import apiclient
11 import functools
12 import threading
13 from apiclient import errors as apiclient_errors
14 import errno
15 import time
16
17 from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from fresh import FreshBase, convertTime, use_counter, check_update
19
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
22
23 _logger = logging.getLogger('arvados.arvados_fuse')
24
25
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile('[\x00/]')
30
31 # '.' and '..' are not reachable if API server is newer than #6277
32 def sanitize_filename(dirty):
33     """Replace disallowed filename characters with harmless "_"."""
34     if dirty is None:
35         return None
36     elif dirty == '':
37         return '_'
38     elif dirty == '.':
39         return '_'
40     elif dirty == '..':
41         return '__'
42     else:
43         return _disallowed_filename_characters.sub('_', dirty)
44
45
46 class Directory(FreshBase):
47     """Generic directory object, backed by a dict.
48
49     Consists of a set of entries with the key representing the filename
50     and the value referencing a File or Directory object.
51     """
52
53     def __init__(self, parent_inode, inodes):
54         """parent_inode is the integer inode number"""
55
56         super(Directory, self).__init__()
57
58         self.inode = None
59         if not isinstance(parent_inode, int):
60             raise Exception("parent_inode should be an int")
61         self.parent_inode = parent_inode
62         self.inodes = inodes
63         self._entries = {}
64         self._mtime = time.time()
65
66     #  Overriden by subclasses to implement logic to update the entries dict
67     #  when the directory is stale
68     @use_counter
69     def update(self):
70         pass
71
72     # Only used when computing the size of the disk footprint of the directory
73     # (stub)
74     def size(self):
75         return 0
76
77     def persisted(self):
78         return False
79
80     def checkupdate(self):
81         if self.stale():
82             try:
83                 self.update()
84             except apiclient.errors.HttpError as e:
85                 _logger.warn(e)
86
87     @use_counter
88     @check_update
89     def __getitem__(self, item):
90         return self._entries[item]
91
92     @use_counter
93     @check_update
94     def items(self):
95         return list(self._entries.items())
96
97     @use_counter
98     @check_update
99     def __contains__(self, k):
100         return k in self._entries
101
102     @use_counter
103     @check_update
104     def __len__(self):
105         return len(self._entries)
106
107     def fresh(self):
108         self.inodes.touch(self)
109         super(Directory, self).fresh()
110
111     def merge(self, items, fn, same, new_entry):
112         """Helper method for updating the contents of the directory.
113
114         Takes a list describing the new contents of the directory, reuse
115         entries that are the same in both the old and new lists, create new
116         entries, and delete old entries missing from the new list.
117
118         :items: iterable with new directory contents
119
120         :fn: function to take an entry in 'items' and return the desired file or
121         directory name, or None if this entry should be skipped
122
123         :same: function to compare an existing entry (a File or Directory
124         object) with an entry in the items list to determine whether to keep
125         the existing entry.
126
127         :new_entry: function to create a new directory entry (File or Directory
128         object) from an entry in the items list.
129
130         """
131
132         oldentries = self._entries
133         self._entries = {}
134         changed = False
135         for i in items:
136             name = sanitize_filename(fn(i))
137             if name:
138                 if name in oldentries and same(oldentries[name], i):
139                     # move existing directory entry over
140                     self._entries[name] = oldentries[name]
141                     del oldentries[name]
142                 else:
143                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
144                     # create new directory entry
145                     ent = new_entry(i)
146                     if ent is not None:
147                         self._entries[name] = self.inodes.add_entry(ent)
148                         changed = True
149
150         # delete any other directory entries that were not in found in 'items'
151         for i in oldentries:
152             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
153             self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
154             self.inodes.del_entry(oldentries[i])
155             changed = True
156
157         if changed:
158             self.inodes.invalidate_inode(self.inode)
159             self._mtime = time.time()
160
161         self.fresh()
162
163     def in_use(self):
164         if super(Directory, self).in_use():
165             return True
166         for v in self._entries.itervalues():
167             if v.in_use():
168                 return True
169         return False
170
171     def has_ref(self, only_children):
172         if super(Directory, self).has_ref(only_children):
173             return True
174         for v in self._entries.itervalues():
175             if v.has_ref(False):
176                 return True
177         return False
178
179     def clear(self):
180         """Delete all entries"""
181         oldentries = self._entries
182         self._entries = {}
183         for n in oldentries:
184             oldentries[n].clear()
185             self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
186             self.inodes.del_entry(oldentries[n])
187         self.inodes.invalidate_inode(self.inode)
188         self.invalidate()
189
190     def kernel_invalidate(self):
191         for n, e in self._entries.iteritems():
192             self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
193             e.kernel_invalidate()
194         self.inodes.invalidate_inode(self.inode)
195
196     def mtime(self):
197         return self._mtime
198
199     def writable(self):
200         return False
201
202     def flush(self):
203         pass
204
205     def want_event_subscribe(self):
206         raise NotImplementedError()
207
208     def create(self, name):
209         raise NotImplementedError()
210
211     def mkdir(self, name):
212         raise NotImplementedError()
213
214     def unlink(self, name):
215         raise NotImplementedError()
216
217     def rmdir(self, name):
218         raise NotImplementedError()
219
220     def rename(self, name_old, name_new, src):
221         raise NotImplementedError()
222
223
224 class CollectionDirectoryBase(Directory):
225     """Represent an Arvados Collection as a directory.
226
227     This class is used for Subcollections, and is also the base class for
228     CollectionDirectory, which implements collection loading/saving on
229     Collection records.
230
231     Most operations act only the underlying Arvados `Collection` object.  The
232     `Collection` object signals via a notify callback to
233     `CollectionDirectoryBase.on_event` that an item was added, removed or
234     modified.  FUSE inodes and directory entries are created, deleted or
235     invalidated in response to these events.
236
237     """
238
239     def __init__(self, parent_inode, inodes, collection):
240         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
241         self.collection = collection
242
243     def new_entry(self, name, item, mtime):
244         name = sanitize_filename(name)
245         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
246             if item.fuse_entry.dead is not True:
247                 raise Exception("Can only reparent dead inode entry")
248             if item.fuse_entry.inode is None:
249                 raise Exception("Reparented entry must still have valid inode")
250             item.fuse_entry.dead = False
251             self._entries[name] = item.fuse_entry
252         elif isinstance(item, arvados.collection.RichCollectionBase):
253             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
254             self._entries[name].populate(mtime)
255         else:
256             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
257         item.fuse_entry = self._entries[name]
258
259     def on_event(self, event, collection, name, item):
260         if collection == self.collection:
261             name = sanitize_filename(name)
262             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
263             with llfuse.lock:
264                 if event == arvados.collection.ADD:
265                     self.new_entry(name, item, self.mtime())
266                 elif event == arvados.collection.DEL:
267                     ent = self._entries[name]
268                     del self._entries[name]
269                     self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
270                     self.inodes.del_entry(ent)
271                 elif event == arvados.collection.MOD:
272                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
273                         self.inodes.invalidate_inode(item.fuse_entry.inode)
274                     elif name in self._entries:
275                         self.inodes.invalidate_inode(self._entries[name].inode)
276
277     def populate(self, mtime):
278         self._mtime = mtime
279         self.collection.subscribe(self.on_event)
280         for entry, item in self.collection.items():
281             self.new_entry(entry, item, self.mtime())
282
283     def writable(self):
284         return self.collection.writable()
285
286     @use_counter
287     def flush(self):
288         with llfuse.lock_released:
289             self.collection.root_collection().save()
290
291     @use_counter
292     @check_update
293     def create(self, name):
294         with llfuse.lock_released:
295             self.collection.open(name, "w").close()
296
297     @use_counter
298     @check_update
299     def mkdir(self, name):
300         with llfuse.lock_released:
301             self.collection.mkdirs(name)
302
303     @use_counter
304     @check_update
305     def unlink(self, name):
306         with llfuse.lock_released:
307             self.collection.remove(name)
308         self.flush()
309
310     @use_counter
311     @check_update
312     def rmdir(self, name):
313         with llfuse.lock_released:
314             self.collection.remove(name)
315         self.flush()
316
317     @use_counter
318     @check_update
319     def rename(self, name_old, name_new, src):
320         if not isinstance(src, CollectionDirectoryBase):
321             raise llfuse.FUSEError(errno.EPERM)
322
323         if name_new in self:
324             ent = src[name_old]
325             tgt = self[name_new]
326             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
327                 pass
328             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
329                 if len(tgt) > 0:
330                     raise llfuse.FUSEError(errno.ENOTEMPTY)
331             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
332                 raise llfuse.FUSEError(errno.ENOTDIR)
333             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
334                 raise llfuse.FUSEError(errno.EISDIR)
335
336         with llfuse.lock_released:
337             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
338         self.flush()
339         src.flush()
340
341     def clear(self):
342         super(CollectionDirectoryBase, self).clear()
343         self.collection = None
344
345
346 class CollectionDirectory(CollectionDirectoryBase):
347     """Represents the root of a directory tree representing a collection."""
348
349     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
350         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
351         self.api = api
352         self.num_retries = num_retries
353         self.collection_record_file = None
354         self.collection_record = None
355         self._poll = True
356         try:
357             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2)/2)
358         except:
359             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
360             self._poll_time = 60*60
361
362         if isinstance(collection_record, dict):
363             self.collection_locator = collection_record['uuid']
364             self._mtime = convertTime(collection_record.get('modified_at'))
365         else:
366             self.collection_locator = collection_record
367             self._mtime = 0
368         self._manifest_size = 0
369         if self.collection_locator:
370             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
371         self._updating_lock = threading.Lock()
372
373     def same(self, i):
374         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
375
376     def writable(self):
377         return self.collection.writable() if self.collection is not None else self._writable
378
379     def want_event_subscribe(self):
380         return (uuid_pattern.match(self.collection_locator) is not None)
381
382     # Used by arv-web.py to switch the contents of the CollectionDirectory
383     def change_collection(self, new_locator):
384         """Switch the contents of the CollectionDirectory.
385
386         Must be called with llfuse.lock held.
387         """
388
389         self.collection_locator = new_locator
390         self.collection_record = None
391         self.update()
392
393     def new_collection(self, new_collection_record, coll_reader):
394         if self.inode:
395             self.clear()
396
397         self.collection_record = new_collection_record
398
399         if self.collection_record:
400             self._mtime = convertTime(self.collection_record.get('modified_at'))
401             self.collection_locator = self.collection_record["uuid"]
402             if self.collection_record_file is not None:
403                 self.collection_record_file.update(self.collection_record)
404
405         self.collection = coll_reader
406         self.populate(self.mtime())
407
408     def uuid(self):
409         return self.collection_locator
410
411     @use_counter
412     def update(self, to_record_version=None):
413         try:
414             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
415                 return True
416
417             if self.collection_locator is None:
418                 self.fresh()
419                 return True
420
421             try:
422                 with llfuse.lock_released:
423                     self._updating_lock.acquire()
424                     if not self.stale():
425                         return
426
427                     _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
428                     if self.collection is not None:
429                         if self.collection.known_past_version(to_record_version):
430                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
431                         else:
432                             self.collection.update()
433                     else:
434                         if uuid_pattern.match(self.collection_locator):
435                             coll_reader = arvados.collection.Collection(
436                                 self.collection_locator, self.api, self.api.keep,
437                                 num_retries=self.num_retries)
438                         else:
439                             coll_reader = arvados.collection.CollectionReader(
440                                 self.collection_locator, self.api, self.api.keep,
441                                 num_retries=self.num_retries)
442                         new_collection_record = coll_reader.api_response() or {}
443                         # If the Collection only exists in Keep, there will be no API
444                         # response.  Fill in the fields we need.
445                         if 'uuid' not in new_collection_record:
446                             new_collection_record['uuid'] = self.collection_locator
447                         if "portable_data_hash" not in new_collection_record:
448                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
449                         if 'manifest_text' not in new_collection_record:
450                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
451
452                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
453                             self.new_collection(new_collection_record, coll_reader)
454
455                         self._manifest_size = len(coll_reader.manifest_text())
456                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
457                 # end with llfuse.lock_released, re-acquire lock
458
459                 self.fresh()
460                 return True
461             finally:
462                 self._updating_lock.release()
463         except arvados.errors.NotFoundError as e:
464             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
465         except arvados.errors.ArgumentError as detail:
466             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
467             if self.collection_record is not None and "manifest_text" in self.collection_record:
468                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
469         except Exception:
470             _logger.exception("arv-mount %s: error", self.collection_locator)
471             if self.collection_record is not None and "manifest_text" in self.collection_record:
472                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
473         self.invalidate()
474         return False
475
476     @use_counter
477     @check_update
478     def __getitem__(self, item):
479         if item == '.arvados#collection':
480             if self.collection_record_file is None:
481                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
482                 self.inodes.add_entry(self.collection_record_file)
483             return self.collection_record_file
484         else:
485             return super(CollectionDirectory, self).__getitem__(item)
486
487     def __contains__(self, k):
488         if k == '.arvados#collection':
489             return True
490         else:
491             return super(CollectionDirectory, self).__contains__(k)
492
493     def invalidate(self):
494         self.collection_record = None
495         self.collection_record_file = None
496         super(CollectionDirectory, self).invalidate()
497
498     def persisted(self):
499         return (self.collection_locator is not None)
500
501     def objsize(self):
502         # This is an empirically-derived heuristic to estimate the memory used
503         # to store this collection's metadata.  Calculating the memory
504         # footprint directly would be more accurate, but also more complicated.
505         return self._manifest_size * 128
506
507     def finalize(self):
508         if self.collection is not None:
509             if self.writable():
510                 self.collection.save()
511             self.collection.stop_threads()
512
513     def clear(self):
514         super(CollectionDirectory, self).clear()
515         self._manifest_size = 0
516
517
518 class TmpCollectionDirectory(CollectionDirectoryBase):
519     """A directory backed by an Arvados collection that never gets saved.
520
521     This supports using Keep as scratch space. A userspace program can
522     read the .arvados#collection file to get a current manifest in
523     order to save a snapshot of the scratch data or use it as a crunch
524     job output.
525     """
526
527     class UnsaveableCollection(arvados.collection.Collection):
528         def save(self):
529             pass
530         def save_new(self):
531             pass
532
533     def __init__(self, parent_inode, inodes, api_client, num_retries):
534         collection = self.UnsaveableCollection(
535             api_client=api_client,
536             keep_client=api_client.keep,
537             num_retries=num_retries)
538         super(TmpCollectionDirectory, self).__init__(
539             parent_inode, inodes, collection)
540         self.collection_record_file = None
541         self.populate(self.mtime())
542
543     def on_event(self, *args, **kwargs):
544         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
545         if self.collection_record_file:
546             with llfuse.lock:
547                 self.collection_record_file.invalidate()
548             self.inodes.invalidate_inode(self.collection_record_file.inode)
549             _logger.debug("%s invalidated collection record", self)
550
551     def collection_record(self):
552         with llfuse.lock_released:
553             return {
554                 "uuid": None,
555                 "manifest_text": self.collection.manifest_text(),
556                 "portable_data_hash": self.collection.portable_data_hash(),
557             }
558
559     def __contains__(self, k):
560         return (k == '.arvados#collection' or
561                 super(TmpCollectionDirectory, self).__contains__(k))
562
563     @use_counter
564     def __getitem__(self, item):
565         if item == '.arvados#collection':
566             if self.collection_record_file is None:
567                 self.collection_record_file = FuncToJSONFile(
568                     self.inode, self.collection_record)
569                 self.inodes.add_entry(self.collection_record_file)
570             return self.collection_record_file
571         return super(TmpCollectionDirectory, self).__getitem__(item)
572
573     def persisted(self):
574         return False
575
576     def writable(self):
577         return True
578
579     def want_event_subscribe(self):
580         return False
581
582     def finalize(self):
583         self.collection.stop_threads()
584
585     def invalidate(self):
586         if self.collection_record_file:
587             self.collection_record_file.invalidate()
588         super(TmpCollectionDirectory, self).invalidate()
589
590
591 class MagicDirectory(Directory):
592     """A special directory that logically contains the set of all extant keep locators.
593
594     When a file is referenced by lookup(), it is tested to see if it is a valid
595     keep locator to a manifest, and if so, loads the manifest contents as a
596     subdirectory of this directory with the locator as the directory name.
597     Since querying a list of all extant keep locators is impractical, only
598     collections that have already been accessed are visible to readdir().
599
600     """
601
602     README_TEXT = """
603 This directory provides access to Arvados collections as subdirectories listed
604 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
605 the form '1234567890abcdef0123456789abcdef+123').
606
607 Note that this directory will appear empty until you attempt to access a
608 specific collection subdirectory (such as trying to 'cd' into it), at which
609 point the collection will actually be looked up on the server and the directory
610 will appear if it exists.
611
612 """.lstrip()
613
614     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
615         super(MagicDirectory, self).__init__(parent_inode, inodes)
616         self.api = api
617         self.num_retries = num_retries
618         self.pdh_only = pdh_only
619
620     def __setattr__(self, name, value):
621         super(MagicDirectory, self).__setattr__(name, value)
622         # When we're assigned an inode, add a README.
623         if ((name == 'inode') and (self.inode is not None) and
624               (not self._entries)):
625             self._entries['README'] = self.inodes.add_entry(
626                 StringFile(self.inode, self.README_TEXT, time.time()))
627             # If we're the root directory, add an identical by_id subdirectory.
628             if self.inode == llfuse.ROOT_INODE:
629                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
630                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
631
632     def __contains__(self, k):
633         if k in self._entries:
634             return True
635
636         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
637             return False
638
639         try:
640             e = self.inodes.add_entry(CollectionDirectory(
641                     self.inode, self.inodes, self.api, self.num_retries, k))
642
643             if e.update():
644                 if k not in self._entries:
645                     self._entries[k] = e
646                 else:
647                     self.inodes.del_entry(e)
648                 return True
649             else:
650                 self.inodes.invalidate_entry(self.inode, k)
651                 self.inodes.del_entry(e)
652                 return False
653         except Exception as ex:
654             _logger.debug('arv-mount exception keep %s', ex)
655             self.inodes.del_entry(e)
656             return False
657
658     def __getitem__(self, item):
659         if item in self:
660             return self._entries[item]
661         else:
662             raise KeyError("No collection with id " + item)
663
664     def clear(self):
665         pass
666
667     def want_event_subscribe(self):
668         return not self.pdh_only
669
670
671 class TagsDirectory(Directory):
672     """A special directory that contains as subdirectories all tags visible to the user."""
673
674     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
675         super(TagsDirectory, self).__init__(parent_inode, inodes)
676         self.api = api
677         self.num_retries = num_retries
678         self._poll = True
679         self._poll_time = poll_time
680         self._extra = set()
681
682     def want_event_subscribe(self):
683         return True
684
685     @use_counter
686     def update(self):
687         with llfuse.lock_released:
688             tags = self.api.links().list(
689                 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
690                 select=['name'], distinct=True, limit=1000
691                 ).execute(num_retries=self.num_retries)
692         if "items" in tags:
693             self.merge(tags['items']+[{"name": n} for n in self._extra],
694                        lambda i: i['name'],
695                        lambda a, i: a.tag == i['name'],
696                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
697
698     @use_counter
699     @check_update
700     def __getitem__(self, item):
701         if super(TagsDirectory, self).__contains__(item):
702             return super(TagsDirectory, self).__getitem__(item)
703         with llfuse.lock_released:
704             tags = self.api.links().list(
705                 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
706             ).execute(num_retries=self.num_retries)
707         if tags["items"]:
708             self._extra.add(item)
709             self.update()
710         return super(TagsDirectory, self).__getitem__(item)
711
712     @use_counter
713     @check_update
714     def __contains__(self, k):
715         if super(TagsDirectory, self).__contains__(k):
716             return True
717         try:
718             self[k]
719             return True
720         except KeyError:
721             pass
722         return False
723
724
725 class TagDirectory(Directory):
726     """A special directory that contains as subdirectories all collections visible
727     to the user that are tagged with a particular tag.
728     """
729
730     def __init__(self, parent_inode, inodes, api, num_retries, tag,
731                  poll=False, poll_time=60):
732         super(TagDirectory, self).__init__(parent_inode, inodes)
733         self.api = api
734         self.num_retries = num_retries
735         self.tag = tag
736         self._poll = poll
737         self._poll_time = poll_time
738
739     def want_event_subscribe(self):
740         return True
741
742     @use_counter
743     def update(self):
744         with llfuse.lock_released:
745             taggedcollections = self.api.links().list(
746                 filters=[['link_class', '=', 'tag'],
747                          ['name', '=', self.tag],
748                          ['head_uuid', 'is_a', 'arvados#collection']],
749                 select=['head_uuid']
750                 ).execute(num_retries=self.num_retries)
751         self.merge(taggedcollections['items'],
752                    lambda i: i['head_uuid'],
753                    lambda a, i: a.collection_locator == i['head_uuid'],
754                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
755
756
757 class ProjectDirectory(Directory):
758     """A special directory that contains the contents of a project."""
759
760     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
761                  poll=False, poll_time=60):
762         super(ProjectDirectory, self).__init__(parent_inode, inodes)
763         self.api = api
764         self.num_retries = num_retries
765         self.project_object = project_object
766         self.project_object_file = None
767         self.project_uuid = project_object['uuid']
768         self._poll = poll
769         self._poll_time = poll_time
770         self._updating_lock = threading.Lock()
771         self._current_user = None
772
773     def want_event_subscribe(self):
774         return True
775
776     def createDirectory(self, i):
777         if collection_uuid_pattern.match(i['uuid']):
778             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
779         elif group_uuid_pattern.match(i['uuid']):
780             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
781         elif link_uuid_pattern.match(i['uuid']):
782             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
783                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
784             else:
785                 return None
786         elif uuid_pattern.match(i['uuid']):
787             return ObjectFile(self.parent_inode, i)
788         else:
789             return None
790
791     def uuid(self):
792         return self.project_uuid
793
794     @use_counter
795     def update(self):
796         if self.project_object_file == None:
797             self.project_object_file = ObjectFile(self.inode, self.project_object)
798             self.inodes.add_entry(self.project_object_file)
799
800         def namefn(i):
801             if 'name' in i:
802                 if i['name'] is None or len(i['name']) == 0:
803                     return None
804                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
805                     # collection or subproject
806                     return i['name']
807                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
808                     # name link
809                     return i['name']
810                 elif 'kind' in i and i['kind'].startswith('arvados#'):
811                     # something else
812                     return "{}.{}".format(i['name'], i['kind'][8:])
813             else:
814                 return None
815
816         def samefn(a, i):
817             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
818                 return a.uuid() == i['uuid']
819             elif isinstance(a, ObjectFile):
820                 return a.uuid() == i['uuid'] and not a.stale()
821             return False
822
823         try:
824             with llfuse.lock_released:
825                 self._updating_lock.acquire()
826                 if not self.stale():
827                     return
828
829                 if group_uuid_pattern.match(self.project_uuid):
830                     self.project_object = self.api.groups().get(
831                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
832                 elif user_uuid_pattern.match(self.project_uuid):
833                     self.project_object = self.api.users().get(
834                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
835
836                 contents = arvados.util.list_all(self.api.groups().contents,
837                                                  self.num_retries, uuid=self.project_uuid)
838
839             # end with llfuse.lock_released, re-acquire lock
840
841             self.merge(contents,
842                        namefn,
843                        samefn,
844                        self.createDirectory)
845         finally:
846             self._updating_lock.release()
847
848     @use_counter
849     @check_update
850     def __getitem__(self, item):
851         if item == '.arvados#project':
852             return self.project_object_file
853         else:
854             return super(ProjectDirectory, self).__getitem__(item)
855
856     def __contains__(self, k):
857         if k == '.arvados#project':
858             return True
859         else:
860             return super(ProjectDirectory, self).__contains__(k)
861
862     @use_counter
863     @check_update
864     def writable(self):
865         with llfuse.lock_released:
866             if not self._current_user:
867                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
868             return self._current_user["uuid"] in self.project_object["writable_by"]
869
870     def persisted(self):
871         return True
872
873     @use_counter
874     @check_update
875     def mkdir(self, name):
876         try:
877             with llfuse.lock_released:
878                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
879                                                     "name": name,
880                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
881             self.invalidate()
882         except apiclient_errors.Error as error:
883             _logger.error(error)
884             raise llfuse.FUSEError(errno.EEXIST)
885
886     @use_counter
887     @check_update
888     def rmdir(self, name):
889         if name not in self:
890             raise llfuse.FUSEError(errno.ENOENT)
891         if not isinstance(self[name], CollectionDirectory):
892             raise llfuse.FUSEError(errno.EPERM)
893         if len(self[name]) > 0:
894             raise llfuse.FUSEError(errno.ENOTEMPTY)
895         with llfuse.lock_released:
896             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
897         self.invalidate()
898
899     @use_counter
900     @check_update
901     def rename(self, name_old, name_new, src):
902         if not isinstance(src, ProjectDirectory):
903             raise llfuse.FUSEError(errno.EPERM)
904
905         ent = src[name_old]
906
907         if not isinstance(ent, CollectionDirectory):
908             raise llfuse.FUSEError(errno.EPERM)
909
910         if name_new in self:
911             # POSIX semantics for replacing one directory with another is
912             # tricky (the target directory must be empty, the operation must be
913             # atomic which isn't possible with the Arvados API as of this
914             # writing) so don't support that.
915             raise llfuse.FUSEError(errno.EPERM)
916
917         self.api.collections().update(uuid=ent.uuid(),
918                                       body={"owner_uuid": self.uuid(),
919                                             "name": name_new}).execute(num_retries=self.num_retries)
920
921         # Acually move the entry from source directory to this directory.
922         del src._entries[name_old]
923         self._entries[name_new] = ent
924         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
925
926
927 class SharedDirectory(Directory):
928     """A special directory that represents users or groups who have shared projects with me."""
929
930     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
931                  poll=False, poll_time=60):
932         super(SharedDirectory, self).__init__(parent_inode, inodes)
933         self.api = api
934         self.num_retries = num_retries
935         self.current_user = api.users().current().execute(num_retries=num_retries)
936         self._poll = True
937         self._poll_time = poll_time
938
939     @use_counter
940     def update(self):
941         with llfuse.lock_released:
942             all_projects = arvados.util.list_all(
943                 self.api.groups().list, self.num_retries,
944                 filters=[['group_class','=','project']])
945             objects = {}
946             for ob in all_projects:
947                 objects[ob['uuid']] = ob
948
949             roots = []
950             root_owners = {}
951             for ob in all_projects:
952                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
953                     roots.append(ob)
954                     root_owners[ob['owner_uuid']] = True
955
956             lusers = arvados.util.list_all(
957                 self.api.users().list, self.num_retries,
958                 filters=[['uuid','in', list(root_owners)]])
959             lgroups = arvados.util.list_all(
960                 self.api.groups().list, self.num_retries,
961                 filters=[['uuid','in', list(root_owners)]])
962
963             users = {}
964             groups = {}
965
966             for l in lusers:
967                 objects[l["uuid"]] = l
968             for l in lgroups:
969                 objects[l["uuid"]] = l
970
971             contents = {}
972             for r in root_owners:
973                 if r in objects:
974                     obr = objects[r]
975                     if obr.get("name"):
976                         contents[obr["name"]] = obr
977                     #elif obr.get("username"):
978                     #    contents[obr["username"]] = obr
979                     elif "first_name" in obr:
980                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
981
982
983             for r in roots:
984                 if r['owner_uuid'] not in objects:
985                     contents[r['name']] = r
986
987         # end with llfuse.lock_released, re-acquire lock
988
989         try:
990             self.merge(contents.items(),
991                        lambda i: i[0],
992                        lambda a, i: a.uuid() == i[1]['uuid'],
993                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
994         except Exception:
995             _logger.exception()
996
997     def want_event_subscribe(self):
998         return True