7751: Add --mount-tmp option.
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8 import threading
9 from apiclient import errors as apiclient_errors
10 import errno
11
12 from fusefile import StringFile, ObjectFile, FuseArvadosFile
13 from fresh import FreshBase, convertTime, use_counter, check_update
14
15 import arvados.collection
16 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
17
18 _logger = logging.getLogger('arvados.arvados_fuse')
19
20
21 # Match any character which FUSE or Linux cannot accommodate as part
22 # of a filename. (If present in a collection filename, they will
23 # appear as underscores in the fuse mount.)
24 _disallowed_filename_characters = re.compile('[\x00/]')
25
26 # '.' and '..' are not reachable if API server is newer than #6277
27 def sanitize_filename(dirty):
28     """Replace disallowed filename characters with harmless "_"."""
29     if dirty is None:
30         return None
31     elif dirty == '':
32         return '_'
33     elif dirty == '.':
34         return '_'
35     elif dirty == '..':
36         return '__'
37     else:
38         return _disallowed_filename_characters.sub('_', dirty)
39
40
41 class Directory(FreshBase):
42     """Generic directory object, backed by a dict.
43
44     Consists of a set of entries with the key representing the filename
45     and the value referencing a File or Directory object.
46     """
47
48     def __init__(self, parent_inode, inodes):
49         """parent_inode is the integer inode number"""
50
51         super(Directory, self).__init__()
52
53         self.inode = None
54         if not isinstance(parent_inode, int):
55             raise Exception("parent_inode should be an int")
56         self.parent_inode = parent_inode
57         self.inodes = inodes
58         self._entries = {}
59         self._mtime = time.time()
60
61     #  Overriden by subclasses to implement logic to update the entries dict
62     #  when the directory is stale
63     @use_counter
64     def update(self):
65         pass
66
67     # Only used when computing the size of the disk footprint of the directory
68     # (stub)
69     def size(self):
70         return 0
71
72     def persisted(self):
73         return False
74
75     def checkupdate(self):
76         if self.stale():
77             try:
78                 self.update()
79             except apiclient.errors.HttpError as e:
80                 _logger.warn(e)
81
82     @use_counter
83     @check_update
84     def __getitem__(self, item):
85         return self._entries[item]
86
87     @use_counter
88     @check_update
89     def items(self):
90         return list(self._entries.items())
91
92     @use_counter
93     @check_update
94     def __contains__(self, k):
95         return k in self._entries
96
97     @use_counter
98     @check_update
99     def __len__(self):
100         return len(self._entries)
101
102     def fresh(self):
103         self.inodes.touch(self)
104         super(Directory, self).fresh()
105
106     def merge(self, items, fn, same, new_entry):
107         """Helper method for updating the contents of the directory.
108
109         Takes a list describing the new contents of the directory, reuse
110         entries that are the same in both the old and new lists, create new
111         entries, and delete old entries missing from the new list.
112
113         :items: iterable with new directory contents
114
115         :fn: function to take an entry in 'items' and return the desired file or
116         directory name, or None if this entry should be skipped
117
118         :same: function to compare an existing entry (a File or Directory
119         object) with an entry in the items list to determine whether to keep
120         the existing entry.
121
122         :new_entry: function to create a new directory entry (File or Directory
123         object) from an entry in the items list.
124
125         """
126
127         oldentries = self._entries
128         self._entries = {}
129         changed = False
130         for i in items:
131             name = sanitize_filename(fn(i))
132             if name:
133                 if name in oldentries and same(oldentries[name], i):
134                     # move existing directory entry over
135                     self._entries[name] = oldentries[name]
136                     del oldentries[name]
137                 else:
138                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
139                     # create new directory entry
140                     ent = new_entry(i)
141                     if ent is not None:
142                         self._entries[name] = self.inodes.add_entry(ent)
143                         changed = True
144
145         # delete any other directory entries that were not in found in 'items'
146         for i in oldentries:
147             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
148             self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
149             self.inodes.del_entry(oldentries[i])
150             changed = True
151
152         if changed:
153             self.inodes.invalidate_inode(self.inode)
154             self._mtime = time.time()
155
156         self.fresh()
157
158     def clear(self, force=False):
159         """Delete all entries"""
160
161         if not self.in_use() or force:
162             oldentries = self._entries
163             self._entries = {}
164             for n in oldentries:
165                 if not oldentries[n].clear(force):
166                     self._entries = oldentries
167                     return False
168             for n in oldentries:
169                 self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
170                 self.inodes.del_entry(oldentries[n])
171             self.inodes.invalidate_inode(self.inode)
172             self.invalidate()
173             return True
174         else:
175             return False
176
177     def mtime(self):
178         return self._mtime
179
180     def writable(self):
181         return False
182
183     def flush(self):
184         pass
185
186     def create(self, name):
187         raise NotImplementedError()
188
189     def mkdir(self, name):
190         raise NotImplementedError()
191
192     def unlink(self, name):
193         raise NotImplementedError()
194
195     def rmdir(self, name):
196         raise NotImplementedError()
197
198     def rename(self, name_old, name_new, src):
199         raise NotImplementedError()
200
201
202 class CollectionDirectoryBase(Directory):
203     """Represent an Arvados Collection as a directory.
204
205     This class is used for Subcollections, and is also the base class for
206     CollectionDirectory, which implements collection loading/saving on
207     Collection records.
208
209     Most operations act only the underlying Arvados `Collection` object.  The
210     `Collection` object signals via a notify callback to
211     `CollectionDirectoryBase.on_event` that an item was added, removed or
212     modified.  FUSE inodes and directory entries are created, deleted or
213     invalidated in response to these events.
214
215     """
216
217     def __init__(self, parent_inode, inodes, collection):
218         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
219         self.collection = collection
220
221     def new_entry(self, name, item, mtime):
222         name = sanitize_filename(name)
223         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
224             if item.fuse_entry.dead is not True:
225                 raise Exception("Can only reparent dead inode entry")
226             if item.fuse_entry.inode is None:
227                 raise Exception("Reparented entry must still have valid inode")
228             item.fuse_entry.dead = False
229             self._entries[name] = item.fuse_entry
230         elif isinstance(item, arvados.collection.RichCollectionBase):
231             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
232             self._entries[name].populate(mtime)
233         else:
234             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
235         item.fuse_entry = self._entries[name]
236
237     def on_event(self, event, collection, name, item):
238         if collection == self.collection:
239             name = sanitize_filename(name)
240             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
241             with llfuse.lock:
242                 if event == arvados.collection.ADD:
243                     self.new_entry(name, item, self.mtime())
244                 elif event == arvados.collection.DEL:
245                     ent = self._entries[name]
246                     del self._entries[name]
247                     self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
248                     self.inodes.del_entry(ent)
249                 elif event == arvados.collection.MOD:
250                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
251                         self.inodes.invalidate_inode(item.fuse_entry.inode)
252                     elif name in self._entries:
253                         self.inodes.invalidate_inode(self._entries[name].inode)
254
255     def populate(self, mtime):
256         self._mtime = mtime
257         self.collection.subscribe(self.on_event)
258         for entry, item in self.collection.items():
259             self.new_entry(entry, item, self.mtime())
260
261     def writable(self):
262         return self.collection.writable()
263
264     @use_counter
265     def flush(self):
266         with llfuse.lock_released:
267             self.collection.root_collection().save()
268
269     @use_counter
270     @check_update
271     def create(self, name):
272         with llfuse.lock_released:
273             self.collection.open(name, "w").close()
274
275     @use_counter
276     @check_update
277     def mkdir(self, name):
278         with llfuse.lock_released:
279             self.collection.mkdirs(name)
280
281     @use_counter
282     @check_update
283     def unlink(self, name):
284         with llfuse.lock_released:
285             self.collection.remove(name)
286         self.flush()
287
288     @use_counter
289     @check_update
290     def rmdir(self, name):
291         with llfuse.lock_released:
292             self.collection.remove(name)
293         self.flush()
294
295     @use_counter
296     @check_update
297     def rename(self, name_old, name_new, src):
298         if not isinstance(src, CollectionDirectoryBase):
299             raise llfuse.FUSEError(errno.EPERM)
300
301         if name_new in self:
302             ent = src[name_old]
303             tgt = self[name_new]
304             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
305                 pass
306             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
307                 if len(tgt) > 0:
308                     raise llfuse.FUSEError(errno.ENOTEMPTY)
309             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
310                 raise llfuse.FUSEError(errno.ENOTDIR)
311             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
312                 raise llfuse.FUSEError(errno.EISDIR)
313
314         with llfuse.lock_released:
315             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
316         self.flush()
317         src.flush()
318
319
320 class CollectionDirectory(CollectionDirectoryBase):
321     """Represents the root of a directory tree representing a collection."""
322
323     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
324         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
325         self.api = api
326         self.num_retries = num_retries
327         self.collection_record_file = None
328         self.collection_record = None
329         if isinstance(collection_record, dict):
330             self.collection_locator = collection_record['uuid']
331             self._mtime = convertTime(collection_record.get('modified_at'))
332         else:
333             self.collection_locator = collection_record
334             self._mtime = 0
335         self._manifest_size = 0
336         if self.collection_locator:
337             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
338         self._updating_lock = threading.Lock()
339
340     def same(self, i):
341         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
342
343     def writable(self):
344         return self.collection.writable() if self.collection is not None else self._writable
345
346     # Used by arv-web.py to switch the contents of the CollectionDirectory
347     def change_collection(self, new_locator):
348         """Switch the contents of the CollectionDirectory.
349
350         Must be called with llfuse.lock held.
351         """
352
353         self.collection_locator = new_locator
354         self.collection_record = None
355         self.update()
356
357     def new_collection(self, new_collection_record, coll_reader):
358         if self.inode:
359             self.clear(force=True)
360
361         self.collection_record = new_collection_record
362
363         if self.collection_record:
364             self._mtime = convertTime(self.collection_record.get('modified_at'))
365             self.collection_locator = self.collection_record["uuid"]
366             if self.collection_record_file is not None:
367                 self.collection_record_file.update(self.collection_record)
368
369         self.collection = coll_reader
370         self.populate(self.mtime())
371
372     def uuid(self):
373         return self.collection_locator
374
375     @use_counter
376     def update(self, to_record_version=None):
377         try:
378             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
379                 return True
380
381             if self.collection_locator is None:
382                 self.fresh()
383                 return True
384
385             try:
386                 with llfuse.lock_released:
387                     self._updating_lock.acquire()
388                     if not self.stale():
389                         return
390
391                     _logger.debug("Updating %s", to_record_version)
392                     if self.collection is not None:
393                         if self.collection.known_past_version(to_record_version):
394                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
395                         else:
396                             self.collection.update()
397                     else:
398                         if uuid_pattern.match(self.collection_locator):
399                             coll_reader = arvados.collection.Collection(
400                                 self.collection_locator, self.api, self.api.keep,
401                                 num_retries=self.num_retries)
402                         else:
403                             coll_reader = arvados.collection.CollectionReader(
404                                 self.collection_locator, self.api, self.api.keep,
405                                 num_retries=self.num_retries)
406                         new_collection_record = coll_reader.api_response() or {}
407                         # If the Collection only exists in Keep, there will be no API
408                         # response.  Fill in the fields we need.
409                         if 'uuid' not in new_collection_record:
410                             new_collection_record['uuid'] = self.collection_locator
411                         if "portable_data_hash" not in new_collection_record:
412                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
413                         if 'manifest_text' not in new_collection_record:
414                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
415
416                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
417                             self.new_collection(new_collection_record, coll_reader)
418
419                         self._manifest_size = len(coll_reader.manifest_text())
420                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
421                 # end with llfuse.lock_released, re-acquire lock
422
423                 self.fresh()
424                 return True
425             finally:
426                 self._updating_lock.release()
427         except arvados.errors.NotFoundError as e:
428             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
429         except arvados.errors.ArgumentError as detail:
430             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
431             if self.collection_record is not None and "manifest_text" in self.collection_record:
432                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
433         except Exception:
434             _logger.exception("arv-mount %s: error", self.collection_locator)
435             if self.collection_record is not None and "manifest_text" in self.collection_record:
436                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
437         return False
438
439     @use_counter
440     @check_update
441     def __getitem__(self, item):
442         if item == '.arvados#collection':
443             if self.collection_record_file is None:
444                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
445                 self.inodes.add_entry(self.collection_record_file)
446             return self.collection_record_file
447         else:
448             return super(CollectionDirectory, self).__getitem__(item)
449
450     def __contains__(self, k):
451         if k == '.arvados#collection':
452             return True
453         else:
454             return super(CollectionDirectory, self).__contains__(k)
455
456     def invalidate(self):
457         self.collection_record = None
458         self.collection_record_file = None
459         super(CollectionDirectory, self).invalidate()
460
461     def persisted(self):
462         return (self.collection_locator is not None)
463
464     def objsize(self):
465         # This is an empirically-derived heuristic to estimate the memory used
466         # to store this collection's metadata.  Calculating the memory
467         # footprint directly would be more accurate, but also more complicated.
468         return self._manifest_size * 128
469
470     def finalize(self):
471         if self.collection is not None:
472             if self.writable():
473                 self.collection.save()
474             self.collection.stop_threads()
475
476
477 class TmpCollectionDirectory(CollectionDirectoryBase):
478     """A directory backed by an Arvados collection that never gets saved.
479
480     This supports using Keep as scratch space. A userspace program can
481     read the .arvados#collection file to get a current manifest in
482     order to save a snapshot of the scratch data or use it as a crunch
483     job output.
484     """
485
486     def __init__(self, parent_inode, inodes, api_client, num_retries):
487         collection = arvados.collection.Collection(
488             api_client=api_client,
489             keep_client=api_client.keep)
490         collection.save = self._commit_collection
491         collection.save_new = self._commit_collection
492         super(TmpCollectionDirectory, self).__init__(
493             parent_inode, inodes, collection)
494         self.collection_record_file = None
495         self._subscribed = False
496         self._update_collection_record()
497
498     def update(self, *args, **kwargs):
499         if not self._subscribed:
500             with llfuse.lock_released:
501                 self.populate(self.mtime())
502             self._subscribed = True
503
504     @use_counter
505     def _commit_collection(self):
506         """Commit the data blocks, but don't save the collection to API.
507
508         Update the content of the special .arvados#collection file, if
509         it has been instantiated.
510         """
511         self.collection.flush()
512         self._update_collection_record()
513         if self.collection_record_file is not None:
514             self.collection_record_file.update(self.collection_record)
515             self.inodes.invalidate_inode(self.collection_record_file.inode)
516
517     def _update_collection_record(self):
518         self.collection_record = {
519             "uuid": None,
520             "manifest_text": self.collection.manifest_text(),
521             "portable_data_hash": self.collection.portable_data_hash(),
522         }
523
524     def __contains__(self, k):
525         return (k == '.arvados#collection' or
526                 super(TmpCollectionDirectory, self).__contains__(k))
527
528     @use_counter
529     def __getitem__(self, item):
530         if item == '.arvados#collection':
531             if self.collection_record_file is None:
532                 self.collection_record_file = ObjectFile(
533                     self.inode, self.collection_record)
534                 self.inodes.add_entry(self.collection_record_file)
535             return self.collection_record_file
536         return super(TmpCollectionDirectory, self).__getitem__(item)
537
538     def writable(self):
539         return True
540
541     def finalize(self):
542         self.collection.stop_threads()
543
544
545 class MagicDirectory(Directory):
546     """A special directory that logically contains the set of all extant keep locators.
547
548     When a file is referenced by lookup(), it is tested to see if it is a valid
549     keep locator to a manifest, and if so, loads the manifest contents as a
550     subdirectory of this directory with the locator as the directory name.
551     Since querying a list of all extant keep locators is impractical, only
552     collections that have already been accessed are visible to readdir().
553
554     """
555
556     README_TEXT = """
557 This directory provides access to Arvados collections as subdirectories listed
558 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
559 the form '1234567890abcdefghijklmnopqrstuv+123').
560
561 Note that this directory will appear empty until you attempt to access a
562 specific collection subdirectory (such as trying to 'cd' into it), at which
563 point the collection will actually be looked up on the server and the directory
564 will appear if it exists.
565 """.lstrip()
566
567     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
568         super(MagicDirectory, self).__init__(parent_inode, inodes)
569         self.api = api
570         self.num_retries = num_retries
571         self.pdh_only = pdh_only
572
573     def __setattr__(self, name, value):
574         super(MagicDirectory, self).__setattr__(name, value)
575         # When we're assigned an inode, add a README.
576         if ((name == 'inode') and (self.inode is not None) and
577               (not self._entries)):
578             self._entries['README'] = self.inodes.add_entry(
579                 StringFile(self.inode, self.README_TEXT, time.time()))
580             # If we're the root directory, add an identical by_id subdirectory.
581             if self.inode == llfuse.ROOT_INODE:
582                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
583                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
584
585     def __contains__(self, k):
586         if k in self._entries:
587             return True
588
589         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
590             return False
591
592         try:
593             e = self.inodes.add_entry(CollectionDirectory(
594                     self.inode, self.inodes, self.api, self.num_retries, k))
595
596             if e.update():
597                 if k not in self._entries:
598                     self._entries[k] = e
599                 else:
600                     self.inodes.del_entry(e)
601                 return True
602             else:
603                 self.inodes.del_entry(e)
604                 return False
605         except Exception as e:
606             _logger.debug('arv-mount exception keep %s', e)
607             self.inodes.del_entry(e)
608             return False
609
610     def __getitem__(self, item):
611         if item in self:
612             return self._entries[item]
613         else:
614             raise KeyError("No collection with id " + item)
615
616     def clear(self, force=False):
617         pass
618
619
620 class RecursiveInvalidateDirectory(Directory):
621     def invalidate(self):
622         try:
623             super(RecursiveInvalidateDirectory, self).invalidate()
624             for a in self._entries:
625                 self._entries[a].invalidate()
626         except Exception:
627             _logger.exception()
628
629
630 class TagsDirectory(RecursiveInvalidateDirectory):
631     """A special directory that contains as subdirectories all tags visible to the user."""
632
633     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
634         super(TagsDirectory, self).__init__(parent_inode, inodes)
635         self.api = api
636         self.num_retries = num_retries
637         self._poll = True
638         self._poll_time = poll_time
639
640     @use_counter
641     def update(self):
642         with llfuse.lock_released:
643             tags = self.api.links().list(
644                 filters=[['link_class', '=', 'tag']],
645                 select=['name'], distinct=True
646                 ).execute(num_retries=self.num_retries)
647         if "items" in tags:
648             self.merge(tags['items'],
649                        lambda i: i['name'],
650                        lambda a, i: a.tag == i['name'],
651                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
652
653
654 class TagDirectory(Directory):
655     """A special directory that contains as subdirectories all collections visible
656     to the user that are tagged with a particular tag.
657     """
658
659     def __init__(self, parent_inode, inodes, api, num_retries, tag,
660                  poll=False, poll_time=60):
661         super(TagDirectory, self).__init__(parent_inode, inodes)
662         self.api = api
663         self.num_retries = num_retries
664         self.tag = tag
665         self._poll = poll
666         self._poll_time = poll_time
667
668     @use_counter
669     def update(self):
670         with llfuse.lock_released:
671             taggedcollections = self.api.links().list(
672                 filters=[['link_class', '=', 'tag'],
673                          ['name', '=', self.tag],
674                          ['head_uuid', 'is_a', 'arvados#collection']],
675                 select=['head_uuid']
676                 ).execute(num_retries=self.num_retries)
677         self.merge(taggedcollections['items'],
678                    lambda i: i['head_uuid'],
679                    lambda a, i: a.collection_locator == i['head_uuid'],
680                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
681
682
683 class ProjectDirectory(Directory):
684     """A special directory that contains the contents of a project."""
685
686     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
687                  poll=False, poll_time=60):
688         super(ProjectDirectory, self).__init__(parent_inode, inodes)
689         self.api = api
690         self.num_retries = num_retries
691         self.project_object = project_object
692         self.project_object_file = None
693         self.project_uuid = project_object['uuid']
694         self._poll = poll
695         self._poll_time = poll_time
696         self._updating_lock = threading.Lock()
697         self._current_user = None
698
699     def createDirectory(self, i):
700         if collection_uuid_pattern.match(i['uuid']):
701             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
702         elif group_uuid_pattern.match(i['uuid']):
703             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
704         elif link_uuid_pattern.match(i['uuid']):
705             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
706                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
707             else:
708                 return None
709         elif uuid_pattern.match(i['uuid']):
710             return ObjectFile(self.parent_inode, i)
711         else:
712             return None
713
714     def uuid(self):
715         return self.project_uuid
716
717     @use_counter
718     def update(self):
719         if self.project_object_file == None:
720             self.project_object_file = ObjectFile(self.inode, self.project_object)
721             self.inodes.add_entry(self.project_object_file)
722
723         def namefn(i):
724             if 'name' in i:
725                 if i['name'] is None or len(i['name']) == 0:
726                     return None
727                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
728                     # collection or subproject
729                     return i['name']
730                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
731                     # name link
732                     return i['name']
733                 elif 'kind' in i and i['kind'].startswith('arvados#'):
734                     # something else
735                     return "{}.{}".format(i['name'], i['kind'][8:])
736             else:
737                 return None
738
739         def samefn(a, i):
740             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
741                 return a.uuid() == i['uuid']
742             elif isinstance(a, ObjectFile):
743                 return a.uuid() == i['uuid'] and not a.stale()
744             return False
745
746         try:
747             with llfuse.lock_released:
748                 self._updating_lock.acquire()
749                 if not self.stale():
750                     return
751
752                 if group_uuid_pattern.match(self.project_uuid):
753                     self.project_object = self.api.groups().get(
754                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
755                 elif user_uuid_pattern.match(self.project_uuid):
756                     self.project_object = self.api.users().get(
757                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
758
759                 contents = arvados.util.list_all(self.api.groups().contents,
760                                                  self.num_retries, uuid=self.project_uuid)
761
762             # end with llfuse.lock_released, re-acquire lock
763
764             self.merge(contents,
765                        namefn,
766                        samefn,
767                        self.createDirectory)
768         finally:
769             self._updating_lock.release()
770
771     @use_counter
772     @check_update
773     def __getitem__(self, item):
774         if item == '.arvados#project':
775             return self.project_object_file
776         else:
777             return super(ProjectDirectory, self).__getitem__(item)
778
779     def __contains__(self, k):
780         if k == '.arvados#project':
781             return True
782         else:
783             return super(ProjectDirectory, self).__contains__(k)
784
785     @use_counter
786     @check_update
787     def writable(self):
788         with llfuse.lock_released:
789             if not self._current_user:
790                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
791             return self._current_user["uuid"] in self.project_object["writable_by"]
792
793     def persisted(self):
794         return True
795
796     @use_counter
797     @check_update
798     def mkdir(self, name):
799         try:
800             with llfuse.lock_released:
801                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
802                                                     "name": name,
803                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
804             self.invalidate()
805         except apiclient_errors.Error as error:
806             _logger.error(error)
807             raise llfuse.FUSEError(errno.EEXIST)
808
809     @use_counter
810     @check_update
811     def rmdir(self, name):
812         if name not in self:
813             raise llfuse.FUSEError(errno.ENOENT)
814         if not isinstance(self[name], CollectionDirectory):
815             raise llfuse.FUSEError(errno.EPERM)
816         if len(self[name]) > 0:
817             raise llfuse.FUSEError(errno.ENOTEMPTY)
818         with llfuse.lock_released:
819             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
820         self.invalidate()
821
822     @use_counter
823     @check_update
824     def rename(self, name_old, name_new, src):
825         if not isinstance(src, ProjectDirectory):
826             raise llfuse.FUSEError(errno.EPERM)
827
828         ent = src[name_old]
829
830         if not isinstance(ent, CollectionDirectory):
831             raise llfuse.FUSEError(errno.EPERM)
832
833         if name_new in self:
834             # POSIX semantics for replacing one directory with another is
835             # tricky (the target directory must be empty, the operation must be
836             # atomic which isn't possible with the Arvados API as of this
837             # writing) so don't support that.
838             raise llfuse.FUSEError(errno.EPERM)
839
840         self.api.collections().update(uuid=ent.uuid(),
841                                       body={"owner_uuid": self.uuid(),
842                                             "name": name_new}).execute(num_retries=self.num_retries)
843
844         # Acually move the entry from source directory to this directory.
845         del src._entries[name_old]
846         self._entries[name_new] = ent
847         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
848
849
850 class SharedDirectory(Directory):
851     """A special directory that represents users or groups who have shared projects with me."""
852
853     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
854                  poll=False, poll_time=60):
855         super(SharedDirectory, self).__init__(parent_inode, inodes)
856         self.api = api
857         self.num_retries = num_retries
858         self.current_user = api.users().current().execute(num_retries=num_retries)
859         self._poll = True
860         self._poll_time = poll_time
861
862     @use_counter
863     def update(self):
864         with llfuse.lock_released:
865             all_projects = arvados.util.list_all(
866                 self.api.groups().list, self.num_retries,
867                 filters=[['group_class','=','project']])
868             objects = {}
869             for ob in all_projects:
870                 objects[ob['uuid']] = ob
871
872             roots = []
873             root_owners = {}
874             for ob in all_projects:
875                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
876                     roots.append(ob)
877                     root_owners[ob['owner_uuid']] = True
878
879             lusers = arvados.util.list_all(
880                 self.api.users().list, self.num_retries,
881                 filters=[['uuid','in', list(root_owners)]])
882             lgroups = arvados.util.list_all(
883                 self.api.groups().list, self.num_retries,
884                 filters=[['uuid','in', list(root_owners)]])
885
886             users = {}
887             groups = {}
888
889             for l in lusers:
890                 objects[l["uuid"]] = l
891             for l in lgroups:
892                 objects[l["uuid"]] = l
893
894             contents = {}
895             for r in root_owners:
896                 if r in objects:
897                     obr = objects[r]
898                     if obr.get("name"):
899                         contents[obr["name"]] = obr
900                     #elif obr.get("username"):
901                     #    contents[obr["username"]] = obr
902                     elif "first_name" in obr:
903                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
904
905
906             for r in roots:
907                 if r['owner_uuid'] not in objects:
908                     contents[r['name']] = r
909
910         # end with llfuse.lock_released, re-acquire lock
911
912         try:
913             self.merge(contents.items(),
914                        lambda i: i[0],
915                        lambda a, i: a.uuid() == i[1]['uuid'],
916                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
917         except Exception:
918             _logger.exception()