Merge branch '7832-pysdk-use-slots' refs #7832
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8 import threading
9 from apiclient import errors as apiclient_errors
10 import errno
11 import time
12
13 from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
14 from fresh import FreshBase, convertTime, use_counter, check_update
15
16 import arvados.collection
17 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
18
19 _logger = logging.getLogger('arvados.arvados_fuse')
20
21
22 # Match any character which FUSE or Linux cannot accommodate as part
23 # of a filename. (If present in a collection filename, they will
24 # appear as underscores in the fuse mount.)
25 _disallowed_filename_characters = re.compile('[\x00/]')
26
27 # '.' and '..' are not reachable if API server is newer than #6277
28 def sanitize_filename(dirty):
29     """Replace disallowed filename characters with harmless "_"."""
30     if dirty is None:
31         return None
32     elif dirty == '':
33         return '_'
34     elif dirty == '.':
35         return '_'
36     elif dirty == '..':
37         return '__'
38     else:
39         return _disallowed_filename_characters.sub('_', dirty)
40
41
42 class Directory(FreshBase):
43     """Generic directory object, backed by a dict.
44
45     Consists of a set of entries with the key representing the filename
46     and the value referencing a File or Directory object.
47     """
48
49     def __init__(self, parent_inode, inodes):
50         """parent_inode is the integer inode number"""
51
52         super(Directory, self).__init__()
53
54         self.inode = None
55         if not isinstance(parent_inode, int):
56             raise Exception("parent_inode should be an int")
57         self.parent_inode = parent_inode
58         self.inodes = inodes
59         self._entries = {}
60         self._mtime = time.time()
61
62     #  Overriden by subclasses to implement logic to update the entries dict
63     #  when the directory is stale
64     @use_counter
65     def update(self):
66         pass
67
68     # Only used when computing the size of the disk footprint of the directory
69     # (stub)
70     def size(self):
71         return 0
72
73     def persisted(self):
74         return False
75
76     def checkupdate(self):
77         if self.stale():
78             try:
79                 self.update()
80             except apiclient.errors.HttpError as e:
81                 _logger.warn(e)
82
83     @use_counter
84     @check_update
85     def __getitem__(self, item):
86         return self._entries[item]
87
88     @use_counter
89     @check_update
90     def items(self):
91         return list(self._entries.items())
92
93     @use_counter
94     @check_update
95     def __contains__(self, k):
96         return k in self._entries
97
98     @use_counter
99     @check_update
100     def __len__(self):
101         return len(self._entries)
102
103     def fresh(self):
104         self.inodes.touch(self)
105         super(Directory, self).fresh()
106
107     def merge(self, items, fn, same, new_entry):
108         """Helper method for updating the contents of the directory.
109
110         Takes a list describing the new contents of the directory, reuse
111         entries that are the same in both the old and new lists, create new
112         entries, and delete old entries missing from the new list.
113
114         :items: iterable with new directory contents
115
116         :fn: function to take an entry in 'items' and return the desired file or
117         directory name, or None if this entry should be skipped
118
119         :same: function to compare an existing entry (a File or Directory
120         object) with an entry in the items list to determine whether to keep
121         the existing entry.
122
123         :new_entry: function to create a new directory entry (File or Directory
124         object) from an entry in the items list.
125
126         """
127
128         oldentries = self._entries
129         self._entries = {}
130         changed = False
131         for i in items:
132             name = sanitize_filename(fn(i))
133             if name:
134                 if name in oldentries and same(oldentries[name], i):
135                     # move existing directory entry over
136                     self._entries[name] = oldentries[name]
137                     del oldentries[name]
138                 else:
139                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
140                     # create new directory entry
141                     ent = new_entry(i)
142                     if ent is not None:
143                         self._entries[name] = self.inodes.add_entry(ent)
144                         changed = True
145
146         # delete any other directory entries that were not in found in 'items'
147         for i in oldentries:
148             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
149             self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
150             self.inodes.del_entry(oldentries[i])
151             changed = True
152
153         if changed:
154             self.inodes.invalidate_inode(self.inode)
155             self._mtime = time.time()
156
157         self.fresh()
158
159     def clear(self, force=False):
160         """Delete all entries"""
161
162         if not self.in_use() or force:
163             oldentries = self._entries
164             self._entries = {}
165             for n in oldentries:
166                 if not oldentries[n].clear(force):
167                     self._entries = oldentries
168                     return False
169             for n in oldentries:
170                 self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
171                 self.inodes.del_entry(oldentries[n])
172             self.inodes.invalidate_inode(self.inode)
173             self.invalidate()
174             return True
175         else:
176             return False
177
178     def mtime(self):
179         return self._mtime
180
181     def writable(self):
182         return False
183
184     def flush(self):
185         pass
186
187     def create(self, name):
188         raise NotImplementedError()
189
190     def mkdir(self, name):
191         raise NotImplementedError()
192
193     def unlink(self, name):
194         raise NotImplementedError()
195
196     def rmdir(self, name):
197         raise NotImplementedError()
198
199     def rename(self, name_old, name_new, src):
200         raise NotImplementedError()
201
202
203 class CollectionDirectoryBase(Directory):
204     """Represent an Arvados Collection as a directory.
205
206     This class is used for Subcollections, and is also the base class for
207     CollectionDirectory, which implements collection loading/saving on
208     Collection records.
209
210     Most operations act only the underlying Arvados `Collection` object.  The
211     `Collection` object signals via a notify callback to
212     `CollectionDirectoryBase.on_event` that an item was added, removed or
213     modified.  FUSE inodes and directory entries are created, deleted or
214     invalidated in response to these events.
215
216     """
217
218     def __init__(self, parent_inode, inodes, collection):
219         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
220         self.collection = collection
221
222     def new_entry(self, name, item, mtime):
223         name = sanitize_filename(name)
224         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
225             if item.fuse_entry.dead is not True:
226                 raise Exception("Can only reparent dead inode entry")
227             if item.fuse_entry.inode is None:
228                 raise Exception("Reparented entry must still have valid inode")
229             item.fuse_entry.dead = False
230             self._entries[name] = item.fuse_entry
231         elif isinstance(item, arvados.collection.RichCollectionBase):
232             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
233             self._entries[name].populate(mtime)
234         else:
235             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
236         item.fuse_entry = self._entries[name]
237
238     def on_event(self, event, collection, name, item):
239         if collection == self.collection:
240             name = sanitize_filename(name)
241             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
242             with llfuse.lock:
243                 if event == arvados.collection.ADD:
244                     self.new_entry(name, item, self.mtime())
245                 elif event == arvados.collection.DEL:
246                     ent = self._entries[name]
247                     del self._entries[name]
248                     self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
249                     self.inodes.del_entry(ent)
250                 elif event == arvados.collection.MOD:
251                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
252                         self.inodes.invalidate_inode(item.fuse_entry.inode)
253                     elif name in self._entries:
254                         self.inodes.invalidate_inode(self._entries[name].inode)
255
256     def populate(self, mtime):
257         self._mtime = mtime
258         self.collection.subscribe(self.on_event)
259         for entry, item in self.collection.items():
260             self.new_entry(entry, item, self.mtime())
261
262     def writable(self):
263         return self.collection.writable()
264
265     @use_counter
266     def flush(self):
267         with llfuse.lock_released:
268             self.collection.root_collection().save()
269
270     @use_counter
271     @check_update
272     def create(self, name):
273         with llfuse.lock_released:
274             self.collection.open(name, "w").close()
275
276     @use_counter
277     @check_update
278     def mkdir(self, name):
279         with llfuse.lock_released:
280             self.collection.mkdirs(name)
281
282     @use_counter
283     @check_update
284     def unlink(self, name):
285         with llfuse.lock_released:
286             self.collection.remove(name)
287         self.flush()
288
289     @use_counter
290     @check_update
291     def rmdir(self, name):
292         with llfuse.lock_released:
293             self.collection.remove(name)
294         self.flush()
295
296     @use_counter
297     @check_update
298     def rename(self, name_old, name_new, src):
299         if not isinstance(src, CollectionDirectoryBase):
300             raise llfuse.FUSEError(errno.EPERM)
301
302         if name_new in self:
303             ent = src[name_old]
304             tgt = self[name_new]
305             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
306                 pass
307             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
308                 if len(tgt) > 0:
309                     raise llfuse.FUSEError(errno.ENOTEMPTY)
310             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
311                 raise llfuse.FUSEError(errno.ENOTDIR)
312             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
313                 raise llfuse.FUSEError(errno.EISDIR)
314
315         with llfuse.lock_released:
316             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
317         self.flush()
318         src.flush()
319
320
321 class CollectionDirectory(CollectionDirectoryBase):
322     """Represents the root of a directory tree representing a collection."""
323
324     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
325         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
326         self.api = api
327         self.num_retries = num_retries
328         self.collection_record_file = None
329         self.collection_record = None
330         if isinstance(collection_record, dict):
331             self.collection_locator = collection_record['uuid']
332             self._mtime = convertTime(collection_record.get('modified_at'))
333         else:
334             self.collection_locator = collection_record
335             self._mtime = 0
336         self._manifest_size = 0
337         if self.collection_locator:
338             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
339         self._updating_lock = threading.Lock()
340
341     def same(self, i):
342         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
343
344     def writable(self):
345         return self.collection.writable() if self.collection is not None else self._writable
346
347     # Used by arv-web.py to switch the contents of the CollectionDirectory
348     def change_collection(self, new_locator):
349         """Switch the contents of the CollectionDirectory.
350
351         Must be called with llfuse.lock held.
352         """
353
354         self.collection_locator = new_locator
355         self.collection_record = None
356         self.update()
357
358     def new_collection(self, new_collection_record, coll_reader):
359         if self.inode:
360             self.clear(force=True)
361
362         self.collection_record = new_collection_record
363
364         if self.collection_record:
365             self._mtime = convertTime(self.collection_record.get('modified_at'))
366             self.collection_locator = self.collection_record["uuid"]
367             if self.collection_record_file is not None:
368                 self.collection_record_file.update(self.collection_record)
369
370         self.collection = coll_reader
371         self.populate(self.mtime())
372
373     def uuid(self):
374         return self.collection_locator
375
376     @use_counter
377     def update(self, to_record_version=None):
378         try:
379             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
380                 return True
381
382             if self.collection_locator is None:
383                 self.fresh()
384                 return True
385
386             try:
387                 with llfuse.lock_released:
388                     self._updating_lock.acquire()
389                     if not self.stale():
390                         return
391
392                     _logger.debug("Updating %s", to_record_version)
393                     if self.collection is not None:
394                         if self.collection.known_past_version(to_record_version):
395                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
396                         else:
397                             self.collection.update()
398                     else:
399                         if uuid_pattern.match(self.collection_locator):
400                             coll_reader = arvados.collection.Collection(
401                                 self.collection_locator, self.api, self.api.keep,
402                                 num_retries=self.num_retries)
403                         else:
404                             coll_reader = arvados.collection.CollectionReader(
405                                 self.collection_locator, self.api, self.api.keep,
406                                 num_retries=self.num_retries)
407                         new_collection_record = coll_reader.api_response() or {}
408                         # If the Collection only exists in Keep, there will be no API
409                         # response.  Fill in the fields we need.
410                         if 'uuid' not in new_collection_record:
411                             new_collection_record['uuid'] = self.collection_locator
412                         if "portable_data_hash" not in new_collection_record:
413                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
414                         if 'manifest_text' not in new_collection_record:
415                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
416
417                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
418                             self.new_collection(new_collection_record, coll_reader)
419
420                         self._manifest_size = len(coll_reader.manifest_text())
421                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
422                 # end with llfuse.lock_released, re-acquire lock
423
424                 self.fresh()
425                 return True
426             finally:
427                 self._updating_lock.release()
428         except arvados.errors.NotFoundError as e:
429             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
430         except arvados.errors.ArgumentError as detail:
431             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
432             if self.collection_record is not None and "manifest_text" in self.collection_record:
433                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
434         except Exception:
435             _logger.exception("arv-mount %s: error", self.collection_locator)
436             if self.collection_record is not None and "manifest_text" in self.collection_record:
437                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
438         return False
439
440     @use_counter
441     @check_update
442     def __getitem__(self, item):
443         if item == '.arvados#collection':
444             if self.collection_record_file is None:
445                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
446                 self.inodes.add_entry(self.collection_record_file)
447             return self.collection_record_file
448         else:
449             return super(CollectionDirectory, self).__getitem__(item)
450
451     def __contains__(self, k):
452         if k == '.arvados#collection':
453             return True
454         else:
455             return super(CollectionDirectory, self).__contains__(k)
456
457     def invalidate(self):
458         self.collection_record = None
459         self.collection_record_file = None
460         super(CollectionDirectory, self).invalidate()
461
462     def persisted(self):
463         return (self.collection_locator is not None)
464
465     def objsize(self):
466         # This is an empirically-derived heuristic to estimate the memory used
467         # to store this collection's metadata.  Calculating the memory
468         # footprint directly would be more accurate, but also more complicated.
469         return self._manifest_size * 128
470
471     def finalize(self):
472         if self.collection is not None:
473             if self.writable():
474                 self.collection.save()
475             self.collection.stop_threads()
476
477
478 class TmpCollectionDirectory(CollectionDirectoryBase):
479     """A directory backed by an Arvados collection that never gets saved.
480
481     This supports using Keep as scratch space. A userspace program can
482     read the .arvados#collection file to get a current manifest in
483     order to save a snapshot of the scratch data or use it as a crunch
484     job output.
485     """
486
487     class UnsaveableCollection(arvados.collection.Collection):
488         def save(self):
489             pass
490         def save_new(self):
491             pass
492
493     def __init__(self, parent_inode, inodes, api_client, num_retries):
494         collection = self.UnsaveableCollection(
495             api_client=api_client,
496             keep_client=api_client.keep)
497         super(TmpCollectionDirectory, self).__init__(
498             parent_inode, inodes, collection)
499         self.collection_record_file = None
500         self.populate(self.mtime())
501
502     def on_event(self, *args, **kwargs):
503         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
504         if self.collection_record_file:
505             with llfuse.lock:
506                 self.collection_record_file.invalidate()
507             self.inodes.invalidate_inode(self.collection_record_file.inode)
508             _logger.debug("%s invalidated collection record", self)
509
510     def collection_record(self):
511         with llfuse.lock_released:
512             return {
513                 "uuid": None,
514                 "manifest_text": self.collection.manifest_text(),
515                 "portable_data_hash": self.collection.portable_data_hash(),
516             }
517
518     def __contains__(self, k):
519         return (k == '.arvados#collection' or
520                 super(TmpCollectionDirectory, self).__contains__(k))
521
522     @use_counter
523     def __getitem__(self, item):
524         if item == '.arvados#collection':
525             if self.collection_record_file is None:
526                 self.collection_record_file = FuncToJSONFile(
527                     self.inode, self.collection_record)
528                 self.inodes.add_entry(self.collection_record_file)
529             return self.collection_record_file
530         return super(TmpCollectionDirectory, self).__getitem__(item)
531
532     def persisted(self):
533         return False
534
535     def writable(self):
536         return True
537
538     def finalize(self):
539         self.collection.stop_threads()
540
541     def invalidate(self):
542         if self.collection_record_file:
543             self.collection_record_file.invalidate()
544         super(TmpCollectionDirectory, self).invalidate()
545
546
547 class MagicDirectory(Directory):
548     """A special directory that logically contains the set of all extant keep locators.
549
550     When a file is referenced by lookup(), it is tested to see if it is a valid
551     keep locator to a manifest, and if so, loads the manifest contents as a
552     subdirectory of this directory with the locator as the directory name.
553     Since querying a list of all extant keep locators is impractical, only
554     collections that have already been accessed are visible to readdir().
555
556     """
557
558     README_TEXT = """
559 This directory provides access to Arvados collections as subdirectories listed
560 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
561 the form '1234567890abcdef0123456789abcdef+123').
562
563 Note that this directory will appear empty until you attempt to access a
564 specific collection subdirectory (such as trying to 'cd' into it), at which
565 point the collection will actually be looked up on the server and the directory
566 will appear if it exists.
567
568 """.lstrip()
569
570     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
571         super(MagicDirectory, self).__init__(parent_inode, inodes)
572         self.api = api
573         self.num_retries = num_retries
574         self.pdh_only = pdh_only
575
576     def __setattr__(self, name, value):
577         super(MagicDirectory, self).__setattr__(name, value)
578         # When we're assigned an inode, add a README.
579         if ((name == 'inode') and (self.inode is not None) and
580               (not self._entries)):
581             self._entries['README'] = self.inodes.add_entry(
582                 StringFile(self.inode, self.README_TEXT, time.time()))
583             # If we're the root directory, add an identical by_id subdirectory.
584             if self.inode == llfuse.ROOT_INODE:
585                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
586                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
587
588     def __contains__(self, k):
589         if k in self._entries:
590             return True
591
592         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
593             return False
594
595         try:
596             e = self.inodes.add_entry(CollectionDirectory(
597                     self.inode, self.inodes, self.api, self.num_retries, k))
598
599             if e.update():
600                 if k not in self._entries:
601                     self._entries[k] = e
602                 else:
603                     self.inodes.del_entry(e)
604                 return True
605             else:
606                 self.inodes.del_entry(e)
607                 return False
608         except Exception as e:
609             _logger.debug('arv-mount exception keep %s', e)
610             self.inodes.del_entry(e)
611             return False
612
613     def __getitem__(self, item):
614         if item in self:
615             return self._entries[item]
616         else:
617             raise KeyError("No collection with id " + item)
618
619     def clear(self, force=False):
620         pass
621
622
623 class RecursiveInvalidateDirectory(Directory):
624     def invalidate(self):
625         try:
626             super(RecursiveInvalidateDirectory, self).invalidate()
627             for a in self._entries:
628                 self._entries[a].invalidate()
629         except Exception:
630             _logger.exception()
631
632
633 class TagsDirectory(RecursiveInvalidateDirectory):
634     """A special directory that contains as subdirectories all tags visible to the user."""
635
636     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
637         super(TagsDirectory, self).__init__(parent_inode, inodes)
638         self.api = api
639         self.num_retries = num_retries
640         self._poll = True
641         self._poll_time = poll_time
642
643     @use_counter
644     def update(self):
645         with llfuse.lock_released:
646             tags = self.api.links().list(
647                 filters=[['link_class', '=', 'tag']],
648                 select=['name'], distinct=True
649                 ).execute(num_retries=self.num_retries)
650         if "items" in tags:
651             self.merge(tags['items'],
652                        lambda i: i['name'],
653                        lambda a, i: a.tag == i['name'],
654                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
655
656
657 class TagDirectory(Directory):
658     """A special directory that contains as subdirectories all collections visible
659     to the user that are tagged with a particular tag.
660     """
661
662     def __init__(self, parent_inode, inodes, api, num_retries, tag,
663                  poll=False, poll_time=60):
664         super(TagDirectory, self).__init__(parent_inode, inodes)
665         self.api = api
666         self.num_retries = num_retries
667         self.tag = tag
668         self._poll = poll
669         self._poll_time = poll_time
670
671     @use_counter
672     def update(self):
673         with llfuse.lock_released:
674             taggedcollections = self.api.links().list(
675                 filters=[['link_class', '=', 'tag'],
676                          ['name', '=', self.tag],
677                          ['head_uuid', 'is_a', 'arvados#collection']],
678                 select=['head_uuid']
679                 ).execute(num_retries=self.num_retries)
680         self.merge(taggedcollections['items'],
681                    lambda i: i['head_uuid'],
682                    lambda a, i: a.collection_locator == i['head_uuid'],
683                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
684
685
686 class ProjectDirectory(Directory):
687     """A special directory that contains the contents of a project."""
688
689     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
690                  poll=False, poll_time=60):
691         super(ProjectDirectory, self).__init__(parent_inode, inodes)
692         self.api = api
693         self.num_retries = num_retries
694         self.project_object = project_object
695         self.project_object_file = None
696         self.project_uuid = project_object['uuid']
697         self._poll = poll
698         self._poll_time = poll_time
699         self._updating_lock = threading.Lock()
700         self._current_user = None
701
702     def createDirectory(self, i):
703         if collection_uuid_pattern.match(i['uuid']):
704             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
705         elif group_uuid_pattern.match(i['uuid']):
706             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
707         elif link_uuid_pattern.match(i['uuid']):
708             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
709                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
710             else:
711                 return None
712         elif uuid_pattern.match(i['uuid']):
713             return ObjectFile(self.parent_inode, i)
714         else:
715             return None
716
717     def uuid(self):
718         return self.project_uuid
719
720     @use_counter
721     def update(self):
722         if self.project_object_file == None:
723             self.project_object_file = ObjectFile(self.inode, self.project_object)
724             self.inodes.add_entry(self.project_object_file)
725
726         def namefn(i):
727             if 'name' in i:
728                 if i['name'] is None or len(i['name']) == 0:
729                     return None
730                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
731                     # collection or subproject
732                     return i['name']
733                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
734                     # name link
735                     return i['name']
736                 elif 'kind' in i and i['kind'].startswith('arvados#'):
737                     # something else
738                     return "{}.{}".format(i['name'], i['kind'][8:])
739             else:
740                 return None
741
742         def samefn(a, i):
743             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
744                 return a.uuid() == i['uuid']
745             elif isinstance(a, ObjectFile):
746                 return a.uuid() == i['uuid'] and not a.stale()
747             return False
748
749         try:
750             with llfuse.lock_released:
751                 self._updating_lock.acquire()
752                 if not self.stale():
753                     return
754
755                 if group_uuid_pattern.match(self.project_uuid):
756                     self.project_object = self.api.groups().get(
757                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
758                 elif user_uuid_pattern.match(self.project_uuid):
759                     self.project_object = self.api.users().get(
760                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
761
762                 contents = arvados.util.list_all(self.api.groups().contents,
763                                                  self.num_retries, uuid=self.project_uuid)
764
765             # end with llfuse.lock_released, re-acquire lock
766
767             self.merge(contents,
768                        namefn,
769                        samefn,
770                        self.createDirectory)
771         finally:
772             self._updating_lock.release()
773
774     @use_counter
775     @check_update
776     def __getitem__(self, item):
777         if item == '.arvados#project':
778             return self.project_object_file
779         else:
780             return super(ProjectDirectory, self).__getitem__(item)
781
782     def __contains__(self, k):
783         if k == '.arvados#project':
784             return True
785         else:
786             return super(ProjectDirectory, self).__contains__(k)
787
788     @use_counter
789     @check_update
790     def writable(self):
791         with llfuse.lock_released:
792             if not self._current_user:
793                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
794             return self._current_user["uuid"] in self.project_object["writable_by"]
795
796     def persisted(self):
797         return True
798
799     @use_counter
800     @check_update
801     def mkdir(self, name):
802         try:
803             with llfuse.lock_released:
804                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
805                                                     "name": name,
806                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
807             self.invalidate()
808         except apiclient_errors.Error as error:
809             _logger.error(error)
810             raise llfuse.FUSEError(errno.EEXIST)
811
812     @use_counter
813     @check_update
814     def rmdir(self, name):
815         if name not in self:
816             raise llfuse.FUSEError(errno.ENOENT)
817         if not isinstance(self[name], CollectionDirectory):
818             raise llfuse.FUSEError(errno.EPERM)
819         if len(self[name]) > 0:
820             raise llfuse.FUSEError(errno.ENOTEMPTY)
821         with llfuse.lock_released:
822             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
823         self.invalidate()
824
825     @use_counter
826     @check_update
827     def rename(self, name_old, name_new, src):
828         if not isinstance(src, ProjectDirectory):
829             raise llfuse.FUSEError(errno.EPERM)
830
831         ent = src[name_old]
832
833         if not isinstance(ent, CollectionDirectory):
834             raise llfuse.FUSEError(errno.EPERM)
835
836         if name_new in self:
837             # POSIX semantics for replacing one directory with another is
838             # tricky (the target directory must be empty, the operation must be
839             # atomic which isn't possible with the Arvados API as of this
840             # writing) so don't support that.
841             raise llfuse.FUSEError(errno.EPERM)
842
843         self.api.collections().update(uuid=ent.uuid(),
844                                       body={"owner_uuid": self.uuid(),
845                                             "name": name_new}).execute(num_retries=self.num_retries)
846
847         # Acually move the entry from source directory to this directory.
848         del src._entries[name_old]
849         self._entries[name_new] = ent
850         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
851
852
853 class SharedDirectory(Directory):
854     """A special directory that represents users or groups who have shared projects with me."""
855
856     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
857                  poll=False, poll_time=60):
858         super(SharedDirectory, self).__init__(parent_inode, inodes)
859         self.api = api
860         self.num_retries = num_retries
861         self.current_user = api.users().current().execute(num_retries=num_retries)
862         self._poll = True
863         self._poll_time = poll_time
864
865     @use_counter
866     def update(self):
867         with llfuse.lock_released:
868             all_projects = arvados.util.list_all(
869                 self.api.groups().list, self.num_retries,
870                 filters=[['group_class','=','project']])
871             objects = {}
872             for ob in all_projects:
873                 objects[ob['uuid']] = ob
874
875             roots = []
876             root_owners = {}
877             for ob in all_projects:
878                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
879                     roots.append(ob)
880                     root_owners[ob['owner_uuid']] = True
881
882             lusers = arvados.util.list_all(
883                 self.api.users().list, self.num_retries,
884                 filters=[['uuid','in', list(root_owners)]])
885             lgroups = arvados.util.list_all(
886                 self.api.groups().list, self.num_retries,
887                 filters=[['uuid','in', list(root_owners)]])
888
889             users = {}
890             groups = {}
891
892             for l in lusers:
893                 objects[l["uuid"]] = l
894             for l in lgroups:
895                 objects[l["uuid"]] = l
896
897             contents = {}
898             for r in root_owners:
899                 if r in objects:
900                     obr = objects[r]
901                     if obr.get("name"):
902                         contents[obr["name"]] = obr
903                     #elif obr.get("username"):
904                     #    contents[obr["username"]] = obr
905                     elif "first_name" in obr:
906                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
907
908
909             for r in roots:
910                 if r['owner_uuid'] not in objects:
911                     contents[r['name']] = r
912
913         # end with llfuse.lock_released, re-acquire lock
914
915         try:
916             self.merge(contents.items(),
917                        lambda i: i[0],
918                        lambda a, i: a.uuid() == i[1]['uuid'],
919                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
920         except Exception:
921             _logger.exception()