Merge branch '8281-arv-mount-retry' closes #8281
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8 import threading
9 from apiclient import errors as apiclient_errors
10 import errno
11 import time
12
13 from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
14 from fresh import FreshBase, convertTime, use_counter, check_update
15
16 import arvados.collection
17 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
18
19 _logger = logging.getLogger('arvados.arvados_fuse')
20
21
22 # Match any character which FUSE or Linux cannot accommodate as part
23 # of a filename. (If present in a collection filename, they will
24 # appear as underscores in the fuse mount.)
25 _disallowed_filename_characters = re.compile('[\x00/]')
26
27 # '.' and '..' are not reachable if API server is newer than #6277
28 def sanitize_filename(dirty):
29     """Replace disallowed filename characters with harmless "_"."""
30     if dirty is None:
31         return None
32     elif dirty == '':
33         return '_'
34     elif dirty == '.':
35         return '_'
36     elif dirty == '..':
37         return '__'
38     else:
39         return _disallowed_filename_characters.sub('_', dirty)
40
41
42 class Directory(FreshBase):
43     """Generic directory object, backed by a dict.
44
45     Consists of a set of entries with the key representing the filename
46     and the value referencing a File or Directory object.
47     """
48
49     def __init__(self, parent_inode, inodes):
50         """parent_inode is the integer inode number"""
51
52         super(Directory, self).__init__()
53
54         self.inode = None
55         if not isinstance(parent_inode, int):
56             raise Exception("parent_inode should be an int")
57         self.parent_inode = parent_inode
58         self.inodes = inodes
59         self._entries = {}
60         self._mtime = time.time()
61
62     #  Overriden by subclasses to implement logic to update the entries dict
63     #  when the directory is stale
64     @use_counter
65     def update(self):
66         pass
67
68     # Only used when computing the size of the disk footprint of the directory
69     # (stub)
70     def size(self):
71         return 0
72
73     def persisted(self):
74         return False
75
76     def checkupdate(self):
77         if self.stale():
78             try:
79                 self.update()
80             except apiclient.errors.HttpError as e:
81                 _logger.warn(e)
82
83     @use_counter
84     @check_update
85     def __getitem__(self, item):
86         return self._entries[item]
87
88     @use_counter
89     @check_update
90     def items(self):
91         return list(self._entries.items())
92
93     @use_counter
94     @check_update
95     def __contains__(self, k):
96         return k in self._entries
97
98     @use_counter
99     @check_update
100     def __len__(self):
101         return len(self._entries)
102
103     def fresh(self):
104         self.inodes.touch(self)
105         super(Directory, self).fresh()
106
107     def merge(self, items, fn, same, new_entry):
108         """Helper method for updating the contents of the directory.
109
110         Takes a list describing the new contents of the directory, reuse
111         entries that are the same in both the old and new lists, create new
112         entries, and delete old entries missing from the new list.
113
114         :items: iterable with new directory contents
115
116         :fn: function to take an entry in 'items' and return the desired file or
117         directory name, or None if this entry should be skipped
118
119         :same: function to compare an existing entry (a File or Directory
120         object) with an entry in the items list to determine whether to keep
121         the existing entry.
122
123         :new_entry: function to create a new directory entry (File or Directory
124         object) from an entry in the items list.
125
126         """
127
128         oldentries = self._entries
129         self._entries = {}
130         changed = False
131         for i in items:
132             name = sanitize_filename(fn(i))
133             if name:
134                 if name in oldentries and same(oldentries[name], i):
135                     # move existing directory entry over
136                     self._entries[name] = oldentries[name]
137                     del oldentries[name]
138                 else:
139                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
140                     # create new directory entry
141                     ent = new_entry(i)
142                     if ent is not None:
143                         self._entries[name] = self.inodes.add_entry(ent)
144                         changed = True
145
146         # delete any other directory entries that were not in found in 'items'
147         for i in oldentries:
148             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
149             self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
150             self.inodes.del_entry(oldentries[i])
151             changed = True
152
153         if changed:
154             self.inodes.invalidate_inode(self.inode)
155             self._mtime = time.time()
156
157         self.fresh()
158
159     def clear(self, force=False):
160         """Delete all entries"""
161
162         if not self.in_use() or force:
163             oldentries = self._entries
164             self._entries = {}
165             for n in oldentries:
166                 if not oldentries[n].clear(force):
167                     self._entries = oldentries
168                     return False
169             for n in oldentries:
170                 self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
171                 self.inodes.del_entry(oldentries[n])
172             self.inodes.invalidate_inode(self.inode)
173             self.invalidate()
174             return True
175         else:
176             return False
177
178     def mtime(self):
179         return self._mtime
180
181     def writable(self):
182         return False
183
184     def flush(self):
185         pass
186
187     def create(self, name):
188         raise NotImplementedError()
189
190     def mkdir(self, name):
191         raise NotImplementedError()
192
193     def unlink(self, name):
194         raise NotImplementedError()
195
196     def rmdir(self, name):
197         raise NotImplementedError()
198
199     def rename(self, name_old, name_new, src):
200         raise NotImplementedError()
201
202
203 class CollectionDirectoryBase(Directory):
204     """Represent an Arvados Collection as a directory.
205
206     This class is used for Subcollections, and is also the base class for
207     CollectionDirectory, which implements collection loading/saving on
208     Collection records.
209
210     Most operations act only the underlying Arvados `Collection` object.  The
211     `Collection` object signals via a notify callback to
212     `CollectionDirectoryBase.on_event` that an item was added, removed or
213     modified.  FUSE inodes and directory entries are created, deleted or
214     invalidated in response to these events.
215
216     """
217
218     def __init__(self, parent_inode, inodes, collection):
219         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
220         self.collection = collection
221
222     def new_entry(self, name, item, mtime):
223         name = sanitize_filename(name)
224         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
225             if item.fuse_entry.dead is not True:
226                 raise Exception("Can only reparent dead inode entry")
227             if item.fuse_entry.inode is None:
228                 raise Exception("Reparented entry must still have valid inode")
229             item.fuse_entry.dead = False
230             self._entries[name] = item.fuse_entry
231         elif isinstance(item, arvados.collection.RichCollectionBase):
232             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
233             self._entries[name].populate(mtime)
234         else:
235             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
236         item.fuse_entry = self._entries[name]
237
238     def on_event(self, event, collection, name, item):
239         if collection == self.collection:
240             name = sanitize_filename(name)
241             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
242             with llfuse.lock:
243                 if event == arvados.collection.ADD:
244                     self.new_entry(name, item, self.mtime())
245                 elif event == arvados.collection.DEL:
246                     ent = self._entries[name]
247                     del self._entries[name]
248                     self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
249                     self.inodes.del_entry(ent)
250                 elif event == arvados.collection.MOD:
251                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
252                         self.inodes.invalidate_inode(item.fuse_entry.inode)
253                     elif name in self._entries:
254                         self.inodes.invalidate_inode(self._entries[name].inode)
255
256     def populate(self, mtime):
257         self._mtime = mtime
258         self.collection.subscribe(self.on_event)
259         for entry, item in self.collection.items():
260             self.new_entry(entry, item, self.mtime())
261
262     def writable(self):
263         return self.collection.writable()
264
265     @use_counter
266     def flush(self):
267         with llfuse.lock_released:
268             self.collection.root_collection().save()
269
270     @use_counter
271     @check_update
272     def create(self, name):
273         with llfuse.lock_released:
274             self.collection.open(name, "w").close()
275
276     @use_counter
277     @check_update
278     def mkdir(self, name):
279         with llfuse.lock_released:
280             self.collection.mkdirs(name)
281
282     @use_counter
283     @check_update
284     def unlink(self, name):
285         with llfuse.lock_released:
286             self.collection.remove(name)
287         self.flush()
288
289     @use_counter
290     @check_update
291     def rmdir(self, name):
292         with llfuse.lock_released:
293             self.collection.remove(name)
294         self.flush()
295
296     @use_counter
297     @check_update
298     def rename(self, name_old, name_new, src):
299         if not isinstance(src, CollectionDirectoryBase):
300             raise llfuse.FUSEError(errno.EPERM)
301
302         if name_new in self:
303             ent = src[name_old]
304             tgt = self[name_new]
305             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
306                 pass
307             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
308                 if len(tgt) > 0:
309                     raise llfuse.FUSEError(errno.ENOTEMPTY)
310             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
311                 raise llfuse.FUSEError(errno.ENOTDIR)
312             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
313                 raise llfuse.FUSEError(errno.EISDIR)
314
315         with llfuse.lock_released:
316             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
317         self.flush()
318         src.flush()
319
320
321 class CollectionDirectory(CollectionDirectoryBase):
322     """Represents the root of a directory tree representing a collection."""
323
324     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
325         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
326         self.api = api
327         self.num_retries = num_retries
328         self.collection_record_file = None
329         self.collection_record = None
330         self._poll = True
331         try:
332             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2)/2)
333         except:
334             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
335             self._poll_time = 60*60
336
337         if isinstance(collection_record, dict):
338             self.collection_locator = collection_record['uuid']
339             self._mtime = convertTime(collection_record.get('modified_at'))
340         else:
341             self.collection_locator = collection_record
342             self._mtime = 0
343         self._manifest_size = 0
344         if self.collection_locator:
345             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
346         self._updating_lock = threading.Lock()
347
348     def same(self, i):
349         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
350
351     def writable(self):
352         return self.collection.writable() if self.collection is not None else self._writable
353
354     # Used by arv-web.py to switch the contents of the CollectionDirectory
355     def change_collection(self, new_locator):
356         """Switch the contents of the CollectionDirectory.
357
358         Must be called with llfuse.lock held.
359         """
360
361         self.collection_locator = new_locator
362         self.collection_record = None
363         self.update()
364
365     def new_collection(self, new_collection_record, coll_reader):
366         if self.inode:
367             self.clear(force=True)
368
369         self.collection_record = new_collection_record
370
371         if self.collection_record:
372             self._mtime = convertTime(self.collection_record.get('modified_at'))
373             self.collection_locator = self.collection_record["uuid"]
374             if self.collection_record_file is not None:
375                 self.collection_record_file.update(self.collection_record)
376
377         self.collection = coll_reader
378         self.populate(self.mtime())
379
380     def uuid(self):
381         return self.collection_locator
382
383     @use_counter
384     def update(self, to_record_version=None):
385         try:
386             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
387                 return True
388
389             if self.collection_locator is None:
390                 self.fresh()
391                 return True
392
393             try:
394                 with llfuse.lock_released:
395                     self._updating_lock.acquire()
396                     if not self.stale():
397                         return
398
399                     _logger.debug("Updating %s", to_record_version)
400                     if self.collection is not None:
401                         if self.collection.known_past_version(to_record_version):
402                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
403                         else:
404                             self.collection.update()
405                     else:
406                         if uuid_pattern.match(self.collection_locator):
407                             coll_reader = arvados.collection.Collection(
408                                 self.collection_locator, self.api, self.api.keep,
409                                 num_retries=self.num_retries)
410                         else:
411                             coll_reader = arvados.collection.CollectionReader(
412                                 self.collection_locator, self.api, self.api.keep,
413                                 num_retries=self.num_retries)
414                         new_collection_record = coll_reader.api_response() or {}
415                         # If the Collection only exists in Keep, there will be no API
416                         # response.  Fill in the fields we need.
417                         if 'uuid' not in new_collection_record:
418                             new_collection_record['uuid'] = self.collection_locator
419                         if "portable_data_hash" not in new_collection_record:
420                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
421                         if 'manifest_text' not in new_collection_record:
422                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
423
424                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
425                             self.new_collection(new_collection_record, coll_reader)
426
427                         self._manifest_size = len(coll_reader.manifest_text())
428                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
429                 # end with llfuse.lock_released, re-acquire lock
430
431                 self.fresh()
432                 return True
433             finally:
434                 self._updating_lock.release()
435         except arvados.errors.NotFoundError as e:
436             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
437         except arvados.errors.ArgumentError as detail:
438             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
439             if self.collection_record is not None and "manifest_text" in self.collection_record:
440                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
441         except Exception:
442             _logger.exception("arv-mount %s: error", self.collection_locator)
443             if self.collection_record is not None and "manifest_text" in self.collection_record:
444                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
445         self.invalidate()
446         return False
447
448     @use_counter
449     @check_update
450     def __getitem__(self, item):
451         if item == '.arvados#collection':
452             if self.collection_record_file is None:
453                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
454                 self.inodes.add_entry(self.collection_record_file)
455             return self.collection_record_file
456         else:
457             return super(CollectionDirectory, self).__getitem__(item)
458
459     def __contains__(self, k):
460         if k == '.arvados#collection':
461             return True
462         else:
463             return super(CollectionDirectory, self).__contains__(k)
464
465     def invalidate(self):
466         self.collection_record = None
467         self.collection_record_file = None
468         super(CollectionDirectory, self).invalidate()
469
470     def persisted(self):
471         return (self.collection_locator is not None)
472
473     def objsize(self):
474         # This is an empirically-derived heuristic to estimate the memory used
475         # to store this collection's metadata.  Calculating the memory
476         # footprint directly would be more accurate, but also more complicated.
477         return self._manifest_size * 128
478
479     def finalize(self):
480         if self.collection is not None:
481             if self.writable():
482                 self.collection.save()
483             self.collection.stop_threads()
484
485
486 class TmpCollectionDirectory(CollectionDirectoryBase):
487     """A directory backed by an Arvados collection that never gets saved.
488
489     This supports using Keep as scratch space. A userspace program can
490     read the .arvados#collection file to get a current manifest in
491     order to save a snapshot of the scratch data or use it as a crunch
492     job output.
493     """
494
495     class UnsaveableCollection(arvados.collection.Collection):
496         def save(self):
497             pass
498         def save_new(self):
499             pass
500
501     def __init__(self, parent_inode, inodes, api_client, num_retries):
502         collection = self.UnsaveableCollection(
503             api_client=api_client,
504             keep_client=api_client.keep,
505             num_retries=num_retries)
506         super(TmpCollectionDirectory, self).__init__(
507             parent_inode, inodes, collection)
508         self.collection_record_file = None
509         self.populate(self.mtime())
510
511     def on_event(self, *args, **kwargs):
512         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
513         if self.collection_record_file:
514             with llfuse.lock:
515                 self.collection_record_file.invalidate()
516             self.inodes.invalidate_inode(self.collection_record_file.inode)
517             _logger.debug("%s invalidated collection record", self)
518
519     def collection_record(self):
520         with llfuse.lock_released:
521             return {
522                 "uuid": None,
523                 "manifest_text": self.collection.manifest_text(),
524                 "portable_data_hash": self.collection.portable_data_hash(),
525             }
526
527     def __contains__(self, k):
528         return (k == '.arvados#collection' or
529                 super(TmpCollectionDirectory, self).__contains__(k))
530
531     @use_counter
532     def __getitem__(self, item):
533         if item == '.arvados#collection':
534             if self.collection_record_file is None:
535                 self.collection_record_file = FuncToJSONFile(
536                     self.inode, self.collection_record)
537                 self.inodes.add_entry(self.collection_record_file)
538             return self.collection_record_file
539         return super(TmpCollectionDirectory, self).__getitem__(item)
540
541     def persisted(self):
542         return False
543
544     def writable(self):
545         return True
546
547     def finalize(self):
548         self.collection.stop_threads()
549
550     def invalidate(self):
551         if self.collection_record_file:
552             self.collection_record_file.invalidate()
553         super(TmpCollectionDirectory, self).invalidate()
554
555
556 class MagicDirectory(Directory):
557     """A special directory that logically contains the set of all extant keep locators.
558
559     When a file is referenced by lookup(), it is tested to see if it is a valid
560     keep locator to a manifest, and if so, loads the manifest contents as a
561     subdirectory of this directory with the locator as the directory name.
562     Since querying a list of all extant keep locators is impractical, only
563     collections that have already been accessed are visible to readdir().
564
565     """
566
567     README_TEXT = """
568 This directory provides access to Arvados collections as subdirectories listed
569 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
570 the form '1234567890abcdef0123456789abcdef+123').
571
572 Note that this directory will appear empty until you attempt to access a
573 specific collection subdirectory (such as trying to 'cd' into it), at which
574 point the collection will actually be looked up on the server and the directory
575 will appear if it exists.
576
577 """.lstrip()
578
579     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
580         super(MagicDirectory, self).__init__(parent_inode, inodes)
581         self.api = api
582         self.num_retries = num_retries
583         self.pdh_only = pdh_only
584
585     def __setattr__(self, name, value):
586         super(MagicDirectory, self).__setattr__(name, value)
587         # When we're assigned an inode, add a README.
588         if ((name == 'inode') and (self.inode is not None) and
589               (not self._entries)):
590             self._entries['README'] = self.inodes.add_entry(
591                 StringFile(self.inode, self.README_TEXT, time.time()))
592             # If we're the root directory, add an identical by_id subdirectory.
593             if self.inode == llfuse.ROOT_INODE:
594                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
595                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
596
597     def __contains__(self, k):
598         if k in self._entries:
599             return True
600
601         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
602             return False
603
604         try:
605             e = self.inodes.add_entry(CollectionDirectory(
606                     self.inode, self.inodes, self.api, self.num_retries, k))
607
608             if e.update():
609                 if k not in self._entries:
610                     self._entries[k] = e
611                 else:
612                     self.inodes.del_entry(e)
613                 return True
614             else:
615                 self.inodes.invalidate_entry(self.inode, k)
616                 self.inodes.del_entry(e)
617                 return False
618         except Exception as ex:
619             _logger.debug('arv-mount exception keep %s', ex)
620             self.inodes.del_entry(e)
621             return False
622
623     def __getitem__(self, item):
624         if item in self:
625             return self._entries[item]
626         else:
627             raise KeyError("No collection with id " + item)
628
629     def clear(self, force=False):
630         pass
631
632
633 class RecursiveInvalidateDirectory(Directory):
634     def invalidate(self):
635         try:
636             super(RecursiveInvalidateDirectory, self).invalidate()
637             for a in self._entries:
638                 self._entries[a].invalidate()
639         except Exception:
640             _logger.exception()
641
642
643 class TagsDirectory(RecursiveInvalidateDirectory):
644     """A special directory that contains as subdirectories all tags visible to the user."""
645
646     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
647         super(TagsDirectory, self).__init__(parent_inode, inodes)
648         self.api = api
649         self.num_retries = num_retries
650         self._poll = True
651         self._poll_time = poll_time
652
653     @use_counter
654     def update(self):
655         with llfuse.lock_released:
656             tags = self.api.links().list(
657                 filters=[['link_class', '=', 'tag']],
658                 select=['name'], distinct=True
659                 ).execute(num_retries=self.num_retries)
660         if "items" in tags:
661             self.merge(tags['items'],
662                        lambda i: i['name'],
663                        lambda a, i: a.tag == i['name'],
664                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
665
666
667 class TagDirectory(Directory):
668     """A special directory that contains as subdirectories all collections visible
669     to the user that are tagged with a particular tag.
670     """
671
672     def __init__(self, parent_inode, inodes, api, num_retries, tag,
673                  poll=False, poll_time=60):
674         super(TagDirectory, self).__init__(parent_inode, inodes)
675         self.api = api
676         self.num_retries = num_retries
677         self.tag = tag
678         self._poll = poll
679         self._poll_time = poll_time
680
681     @use_counter
682     def update(self):
683         with llfuse.lock_released:
684             taggedcollections = self.api.links().list(
685                 filters=[['link_class', '=', 'tag'],
686                          ['name', '=', self.tag],
687                          ['head_uuid', 'is_a', 'arvados#collection']],
688                 select=['head_uuid']
689                 ).execute(num_retries=self.num_retries)
690         self.merge(taggedcollections['items'],
691                    lambda i: i['head_uuid'],
692                    lambda a, i: a.collection_locator == i['head_uuid'],
693                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
694
695
696 class ProjectDirectory(Directory):
697     """A special directory that contains the contents of a project."""
698
699     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
700                  poll=False, poll_time=60):
701         super(ProjectDirectory, self).__init__(parent_inode, inodes)
702         self.api = api
703         self.num_retries = num_retries
704         self.project_object = project_object
705         self.project_object_file = None
706         self.project_uuid = project_object['uuid']
707         self._poll = poll
708         self._poll_time = poll_time
709         self._updating_lock = threading.Lock()
710         self._current_user = None
711
712     def createDirectory(self, i):
713         if collection_uuid_pattern.match(i['uuid']):
714             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
715         elif group_uuid_pattern.match(i['uuid']):
716             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
717         elif link_uuid_pattern.match(i['uuid']):
718             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
719                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
720             else:
721                 return None
722         elif uuid_pattern.match(i['uuid']):
723             return ObjectFile(self.parent_inode, i)
724         else:
725             return None
726
727     def uuid(self):
728         return self.project_uuid
729
730     @use_counter
731     def update(self):
732         if self.project_object_file == None:
733             self.project_object_file = ObjectFile(self.inode, self.project_object)
734             self.inodes.add_entry(self.project_object_file)
735
736         def namefn(i):
737             if 'name' in i:
738                 if i['name'] is None or len(i['name']) == 0:
739                     return None
740                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
741                     # collection or subproject
742                     return i['name']
743                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
744                     # name link
745                     return i['name']
746                 elif 'kind' in i and i['kind'].startswith('arvados#'):
747                     # something else
748                     return "{}.{}".format(i['name'], i['kind'][8:])
749             else:
750                 return None
751
752         def samefn(a, i):
753             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
754                 return a.uuid() == i['uuid']
755             elif isinstance(a, ObjectFile):
756                 return a.uuid() == i['uuid'] and not a.stale()
757             return False
758
759         try:
760             with llfuse.lock_released:
761                 self._updating_lock.acquire()
762                 if not self.stale():
763                     return
764
765                 if group_uuid_pattern.match(self.project_uuid):
766                     self.project_object = self.api.groups().get(
767                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
768                 elif user_uuid_pattern.match(self.project_uuid):
769                     self.project_object = self.api.users().get(
770                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
771
772                 contents = arvados.util.list_all(self.api.groups().contents,
773                                                  self.num_retries, uuid=self.project_uuid)
774
775             # end with llfuse.lock_released, re-acquire lock
776
777             self.merge(contents,
778                        namefn,
779                        samefn,
780                        self.createDirectory)
781         finally:
782             self._updating_lock.release()
783
784     @use_counter
785     @check_update
786     def __getitem__(self, item):
787         if item == '.arvados#project':
788             return self.project_object_file
789         else:
790             return super(ProjectDirectory, self).__getitem__(item)
791
792     def __contains__(self, k):
793         if k == '.arvados#project':
794             return True
795         else:
796             return super(ProjectDirectory, self).__contains__(k)
797
798     @use_counter
799     @check_update
800     def writable(self):
801         with llfuse.lock_released:
802             if not self._current_user:
803                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
804             return self._current_user["uuid"] in self.project_object["writable_by"]
805
806     def persisted(self):
807         return True
808
809     @use_counter
810     @check_update
811     def mkdir(self, name):
812         try:
813             with llfuse.lock_released:
814                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
815                                                     "name": name,
816                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
817             self.invalidate()
818         except apiclient_errors.Error as error:
819             _logger.error(error)
820             raise llfuse.FUSEError(errno.EEXIST)
821
822     @use_counter
823     @check_update
824     def rmdir(self, name):
825         if name not in self:
826             raise llfuse.FUSEError(errno.ENOENT)
827         if not isinstance(self[name], CollectionDirectory):
828             raise llfuse.FUSEError(errno.EPERM)
829         if len(self[name]) > 0:
830             raise llfuse.FUSEError(errno.ENOTEMPTY)
831         with llfuse.lock_released:
832             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
833         self.invalidate()
834
835     @use_counter
836     @check_update
837     def rename(self, name_old, name_new, src):
838         if not isinstance(src, ProjectDirectory):
839             raise llfuse.FUSEError(errno.EPERM)
840
841         ent = src[name_old]
842
843         if not isinstance(ent, CollectionDirectory):
844             raise llfuse.FUSEError(errno.EPERM)
845
846         if name_new in self:
847             # POSIX semantics for replacing one directory with another is
848             # tricky (the target directory must be empty, the operation must be
849             # atomic which isn't possible with the Arvados API as of this
850             # writing) so don't support that.
851             raise llfuse.FUSEError(errno.EPERM)
852
853         self.api.collections().update(uuid=ent.uuid(),
854                                       body={"owner_uuid": self.uuid(),
855                                             "name": name_new}).execute(num_retries=self.num_retries)
856
857         # Acually move the entry from source directory to this directory.
858         del src._entries[name_old]
859         self._entries[name_new] = ent
860         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
861
862
863 class SharedDirectory(Directory):
864     """A special directory that represents users or groups who have shared projects with me."""
865
866     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
867                  poll=False, poll_time=60):
868         super(SharedDirectory, self).__init__(parent_inode, inodes)
869         self.api = api
870         self.num_retries = num_retries
871         self.current_user = api.users().current().execute(num_retries=num_retries)
872         self._poll = True
873         self._poll_time = poll_time
874
875     @use_counter
876     def update(self):
877         with llfuse.lock_released:
878             all_projects = arvados.util.list_all(
879                 self.api.groups().list, self.num_retries,
880                 filters=[['group_class','=','project']])
881             objects = {}
882             for ob in all_projects:
883                 objects[ob['uuid']] = ob
884
885             roots = []
886             root_owners = {}
887             for ob in all_projects:
888                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
889                     roots.append(ob)
890                     root_owners[ob['owner_uuid']] = True
891
892             lusers = arvados.util.list_all(
893                 self.api.users().list, self.num_retries,
894                 filters=[['uuid','in', list(root_owners)]])
895             lgroups = arvados.util.list_all(
896                 self.api.groups().list, self.num_retries,
897                 filters=[['uuid','in', list(root_owners)]])
898
899             users = {}
900             groups = {}
901
902             for l in lusers:
903                 objects[l["uuid"]] = l
904             for l in lgroups:
905                 objects[l["uuid"]] = l
906
907             contents = {}
908             for r in root_owners:
909                 if r in objects:
910                     obr = objects[r]
911                     if obr.get("name"):
912                         contents[obr["name"]] = obr
913                     #elif obr.get("username"):
914                     #    contents[obr["username"]] = obr
915                     elif "first_name" in obr:
916                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
917
918
919             for r in roots:
920                 if r['owner_uuid'] not in objects:
921                     contents[r['name']] = r
922
923         # end with llfuse.lock_released, re-acquire lock
924
925         try:
926             self.merge(contents.items(),
927                        lambda i: i[0],
928                        lambda a, i: a.uuid() == i[1]['uuid'],
929                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
930         except Exception:
931             _logger.exception()