Merge branch 'master' into 7661-fuse-by-pdh
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8 import threading
9 from apiclient import errors as apiclient_errors
10 import errno
11
12 from fusefile import StringFile, ObjectFile, FuseArvadosFile
13 from fresh import FreshBase, convertTime, use_counter, check_update
14
15 import arvados.collection
16 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
17
18 _logger = logging.getLogger('arvados.arvados_fuse')
19
20
21 # Match any character which FUSE or Linux cannot accommodate as part
22 # of a filename. (If present in a collection filename, they will
23 # appear as underscores in the fuse mount.)
24 _disallowed_filename_characters = re.compile('[\x00/]')
25
26 # '.' and '..' are not reachable if API server is newer than #6277
27 def sanitize_filename(dirty):
28     """Replace disallowed filename characters with harmless "_"."""
29     if dirty is None:
30         return None
31     elif dirty == '':
32         return '_'
33     elif dirty == '.':
34         return '_'
35     elif dirty == '..':
36         return '__'
37     else:
38         return _disallowed_filename_characters.sub('_', dirty)
39
40
41 class Directory(FreshBase):
42     """Generic directory object, backed by a dict.
43
44     Consists of a set of entries with the key representing the filename
45     and the value referencing a File or Directory object.
46     """
47
48     def __init__(self, parent_inode, inodes):
49         """parent_inode is the integer inode number"""
50
51         super(Directory, self).__init__()
52
53         self.inode = None
54         if not isinstance(parent_inode, int):
55             raise Exception("parent_inode should be an int")
56         self.parent_inode = parent_inode
57         self.inodes = inodes
58         self._entries = {}
59         self._mtime = time.time()
60
61     #  Overriden by subclasses to implement logic to update the entries dict
62     #  when the directory is stale
63     @use_counter
64     def update(self):
65         pass
66
67     # Only used when computing the size of the disk footprint of the directory
68     # (stub)
69     def size(self):
70         return 0
71
72     def persisted(self):
73         return False
74
75     def checkupdate(self):
76         if self.stale():
77             try:
78                 self.update()
79             except apiclient.errors.HttpError as e:
80                 _logger.warn(e)
81
82     @use_counter
83     @check_update
84     def __getitem__(self, item):
85         return self._entries[item]
86
87     @use_counter
88     @check_update
89     def items(self):
90         return list(self._entries.items())
91
92     @use_counter
93     @check_update
94     def __contains__(self, k):
95         return k in self._entries
96
97     @use_counter
98     @check_update
99     def __len__(self):
100         return len(self._entries)
101
102     def fresh(self):
103         self.inodes.touch(self)
104         super(Directory, self).fresh()
105
106     def merge(self, items, fn, same, new_entry):
107         """Helper method for updating the contents of the directory.
108
109         Takes a list describing the new contents of the directory, reuse
110         entries that are the same in both the old and new lists, create new
111         entries, and delete old entries missing from the new list.
112
113         :items: iterable with new directory contents
114
115         :fn: function to take an entry in 'items' and return the desired file or
116         directory name, or None if this entry should be skipped
117
118         :same: function to compare an existing entry (a File or Directory
119         object) with an entry in the items list to determine whether to keep
120         the existing entry.
121
122         :new_entry: function to create a new directory entry (File or Directory
123         object) from an entry in the items list.
124
125         """
126
127         oldentries = self._entries
128         self._entries = {}
129         changed = False
130         for i in items:
131             name = sanitize_filename(fn(i))
132             if name:
133                 if name in oldentries and same(oldentries[name], i):
134                     # move existing directory entry over
135                     self._entries[name] = oldentries[name]
136                     del oldentries[name]
137                 else:
138                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
139                     # create new directory entry
140                     ent = new_entry(i)
141                     if ent is not None:
142                         self._entries[name] = self.inodes.add_entry(ent)
143                         changed = True
144
145         # delete any other directory entries that were not in found in 'items'
146         for i in oldentries:
147             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
148             self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
149             self.inodes.del_entry(oldentries[i])
150             changed = True
151
152         if changed:
153             self.inodes.invalidate_inode(self.inode)
154             self._mtime = time.time()
155
156         self.fresh()
157
158     def clear(self, force=False):
159         """Delete all entries"""
160
161         if not self.in_use() or force:
162             oldentries = self._entries
163             self._entries = {}
164             for n in oldentries:
165                 if not oldentries[n].clear(force):
166                     self._entries = oldentries
167                     return False
168             for n in oldentries:
169                 self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
170                 self.inodes.del_entry(oldentries[n])
171             self.inodes.invalidate_inode(self.inode)
172             self.invalidate()
173             return True
174         else:
175             return False
176
177     def mtime(self):
178         return self._mtime
179
180     def writable(self):
181         return False
182
183     def flush(self):
184         pass
185
186     def create(self, name):
187         raise NotImplementedError()
188
189     def mkdir(self, name):
190         raise NotImplementedError()
191
192     def unlink(self, name):
193         raise NotImplementedError()
194
195     def rmdir(self, name):
196         raise NotImplementedError()
197
198     def rename(self, name_old, name_new, src):
199         raise NotImplementedError()
200
201
202 class CollectionDirectoryBase(Directory):
203     """Represent an Arvados Collection as a directory.
204
205     This class is used for Subcollections, and is also the base class for
206     CollectionDirectory, which implements collection loading/saving on
207     Collection records.
208
209     Most operations act only the underlying Arvados `Collection` object.  The
210     `Collection` object signals via a notify callback to
211     `CollectionDirectoryBase.on_event` that an item was added, removed or
212     modified.  FUSE inodes and directory entries are created, deleted or
213     invalidated in response to these events.
214
215     """
216
217     def __init__(self, parent_inode, inodes, collection):
218         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
219         self.collection = collection
220
221     def new_entry(self, name, item, mtime):
222         name = sanitize_filename(name)
223         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
224             if item.fuse_entry.dead is not True:
225                 raise Exception("Can only reparent dead inode entry")
226             if item.fuse_entry.inode is None:
227                 raise Exception("Reparented entry must still have valid inode")
228             item.fuse_entry.dead = False
229             self._entries[name] = item.fuse_entry
230         elif isinstance(item, arvados.collection.RichCollectionBase):
231             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
232             self._entries[name].populate(mtime)
233         else:
234             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
235         item.fuse_entry = self._entries[name]
236
237     def on_event(self, event, collection, name, item):
238         if collection == self.collection:
239             name = sanitize_filename(name)
240             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
241             with llfuse.lock:
242                 if event == arvados.collection.ADD:
243                     self.new_entry(name, item, self.mtime())
244                 elif event == arvados.collection.DEL:
245                     ent = self._entries[name]
246                     del self._entries[name]
247                     self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
248                     self.inodes.del_entry(ent)
249                 elif event == arvados.collection.MOD:
250                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
251                         self.inodes.invalidate_inode(item.fuse_entry.inode)
252                     elif name in self._entries:
253                         self.inodes.invalidate_inode(self._entries[name].inode)
254
255     def populate(self, mtime):
256         self._mtime = mtime
257         self.collection.subscribe(self.on_event)
258         for entry, item in self.collection.items():
259             self.new_entry(entry, item, self.mtime())
260
261     def writable(self):
262         return self.collection.writable()
263
264     @use_counter
265     def flush(self):
266         with llfuse.lock_released:
267             self.collection.root_collection().save()
268
269     @use_counter
270     @check_update
271     def create(self, name):
272         with llfuse.lock_released:
273             self.collection.open(name, "w").close()
274
275     @use_counter
276     @check_update
277     def mkdir(self, name):
278         with llfuse.lock_released:
279             self.collection.mkdirs(name)
280
281     @use_counter
282     @check_update
283     def unlink(self, name):
284         with llfuse.lock_released:
285             self.collection.remove(name)
286         self.flush()
287
288     @use_counter
289     @check_update
290     def rmdir(self, name):
291         with llfuse.lock_released:
292             self.collection.remove(name)
293         self.flush()
294
295     @use_counter
296     @check_update
297     def rename(self, name_old, name_new, src):
298         if not isinstance(src, CollectionDirectoryBase):
299             raise llfuse.FUSEError(errno.EPERM)
300
301         if name_new in self:
302             ent = src[name_old]
303             tgt = self[name_new]
304             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
305                 pass
306             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
307                 if len(tgt) > 0:
308                     raise llfuse.FUSEError(errno.ENOTEMPTY)
309             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
310                 raise llfuse.FUSEError(errno.ENOTDIR)
311             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
312                 raise llfuse.FUSEError(errno.EISDIR)
313
314         with llfuse.lock_released:
315             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
316         self.flush()
317         src.flush()
318
319
320 class CollectionDirectory(CollectionDirectoryBase):
321     """Represents the root of a directory tree representing a collection."""
322
323     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
324         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
325         self.api = api
326         self.num_retries = num_retries
327         self.collection_record_file = None
328         self.collection_record = None
329         if isinstance(collection_record, dict):
330             self.collection_locator = collection_record['uuid']
331             self._mtime = convertTime(collection_record.get('modified_at'))
332         else:
333             self.collection_locator = collection_record
334             self._mtime = 0
335         self._manifest_size = 0
336         if self.collection_locator:
337             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
338         self._updating_lock = threading.Lock()
339
340     def same(self, i):
341         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
342
343     def writable(self):
344         return self.collection.writable() if self.collection is not None else self._writable
345
346     # Used by arv-web.py to switch the contents of the CollectionDirectory
347     def change_collection(self, new_locator):
348         """Switch the contents of the CollectionDirectory.
349
350         Must be called with llfuse.lock held.
351         """
352
353         self.collection_locator = new_locator
354         self.collection_record = None
355         self.update()
356
357     def new_collection(self, new_collection_record, coll_reader):
358         if self.inode:
359             self.clear(force=True)
360
361         self.collection_record = new_collection_record
362
363         if self.collection_record:
364             self._mtime = convertTime(self.collection_record.get('modified_at'))
365             self.collection_locator = self.collection_record["uuid"]
366             if self.collection_record_file is not None:
367                 self.collection_record_file.update(self.collection_record)
368
369         self.collection = coll_reader
370         self.populate(self.mtime())
371
372     def uuid(self):
373         return self.collection_locator
374
375     @use_counter
376     def update(self, to_record_version=None):
377         try:
378             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
379                 return True
380
381             if self.collection_locator is None:
382                 self.fresh()
383                 return True
384
385             try:
386                 with llfuse.lock_released:
387                     self._updating_lock.acquire()
388                     if not self.stale():
389                         return
390
391                     _logger.debug("Updating %s", to_record_version)
392                     if self.collection is not None:
393                         if self.collection.known_past_version(to_record_version):
394                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
395                         else:
396                             self.collection.update()
397                     else:
398                         if uuid_pattern.match(self.collection_locator):
399                             coll_reader = arvados.collection.Collection(
400                                 self.collection_locator, self.api, self.api.keep,
401                                 num_retries=self.num_retries)
402                         else:
403                             coll_reader = arvados.collection.CollectionReader(
404                                 self.collection_locator, self.api, self.api.keep,
405                                 num_retries=self.num_retries)
406                         new_collection_record = coll_reader.api_response() or {}
407                         # If the Collection only exists in Keep, there will be no API
408                         # response.  Fill in the fields we need.
409                         if 'uuid' not in new_collection_record:
410                             new_collection_record['uuid'] = self.collection_locator
411                         if "portable_data_hash" not in new_collection_record:
412                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
413                         if 'manifest_text' not in new_collection_record:
414                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
415
416                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
417                             self.new_collection(new_collection_record, coll_reader)
418
419                         self._manifest_size = len(coll_reader.manifest_text())
420                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
421                 # end with llfuse.lock_released, re-acquire lock
422
423                 self.fresh()
424                 return True
425             finally:
426                 self._updating_lock.release()
427         except arvados.errors.NotFoundError as e:
428             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
429         except arvados.errors.ArgumentError as detail:
430             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
431             if self.collection_record is not None and "manifest_text" in self.collection_record:
432                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
433         except Exception:
434             _logger.exception("arv-mount %s: error", self.collection_locator)
435             if self.collection_record is not None and "manifest_text" in self.collection_record:
436                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
437         return False
438
439     @use_counter
440     @check_update
441     def __getitem__(self, item):
442         if item == '.arvados#collection':
443             if self.collection_record_file is None:
444                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
445                 self.inodes.add_entry(self.collection_record_file)
446             return self.collection_record_file
447         else:
448             return super(CollectionDirectory, self).__getitem__(item)
449
450     def __contains__(self, k):
451         if k == '.arvados#collection':
452             return True
453         else:
454             return super(CollectionDirectory, self).__contains__(k)
455
456     def invalidate(self):
457         self.collection_record = None
458         self.collection_record_file = None
459         super(CollectionDirectory, self).invalidate()
460
461     def persisted(self):
462         return (self.collection_locator is not None)
463
464     def objsize(self):
465         # This is an empirically-derived heuristic to estimate the memory used
466         # to store this collection's metadata.  Calculating the memory
467         # footprint directly would be more accurate, but also more complicated.
468         return self._manifest_size * 128
469
470     def finalize(self):
471         if self.collection is not None:
472             if self.writable():
473                 self.collection.save()
474             self.collection.stop_threads()
475
476
477 class MagicDirectory(Directory):
478     """A special directory that logically contains the set of all extant keep locators.
479
480     When a file is referenced by lookup(), it is tested to see if it is a valid
481     keep locator to a manifest, and if so, loads the manifest contents as a
482     subdirectory of this directory with the locator as the directory name.
483     Since querying a list of all extant keep locators is impractical, only
484     collections that have already been accessed are visible to readdir().
485
486     """
487
488     README_TEXT = """
489 This directory provides access to Arvados collections as subdirectories listed
490 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
491 the form '1234567890abcdefghijklmnopqrstuv+123').
492
493 Note that this directory will appear empty until you attempt to access a
494 specific collection subdirectory (such as trying to 'cd' into it), at which
495 point the collection will actually be looked up on the server and the directory
496 will appear if it exists.
497 """.lstrip()
498
499     def __init__(self, parent_inode, inodes, api, num_retries, by_pdh=False):
500         super(MagicDirectory, self).__init__(parent_inode, inodes)
501         self.api = api
502         self.num_retries = num_retries
503         self.by_pdh = by_pdh
504
505     def __setattr__(self, name, value):
506         super(MagicDirectory, self).__setattr__(name, value)
507         # When we're assigned an inode, add a README.
508         if ((name == 'inode') and (self.inode is not None) and
509               (not self._entries)):
510             self._entries['README'] = self.inodes.add_entry(
511                 StringFile(self.inode, self.README_TEXT, time.time()))
512             # If we're the root directory, add an identical by_id subdirectory.
513             if self.inode == llfuse.ROOT_INODE:
514                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
515                         self.inode, self.inodes, self.api, self.num_retries))
516
517     def __contains__(self, k):
518         if k in self._entries:
519             return True
520
521         if self.by_pdh and uuid_pattern.match(k):
522             raise llfuse.FUSEError(errno.ENOENT)
523
524         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
525             return False
526
527         try:
528             e = self.inodes.add_entry(CollectionDirectory(
529                     self.inode, self.inodes, self.api, self.num_retries, k))
530
531             if e.update():
532                 if k not in self._entries:
533                     self._entries[k] = e
534                 else:
535                     self.inodes.del_entry(e)
536                 return True
537             else:
538                 self.inodes.del_entry(e)
539                 return False
540         except Exception as e:
541             _logger.debug('arv-mount exception keep %s', e)
542             self.inodes.del_entry(e)
543             return False
544
545     def __getitem__(self, item):
546         if item in self:
547             return self._entries[item]
548         else:
549             raise KeyError("No collection with id " + item)
550
551     def clear(self, force=False):
552         pass
553
554
555 class RecursiveInvalidateDirectory(Directory):
556     def invalidate(self):
557         try:
558             super(RecursiveInvalidateDirectory, self).invalidate()
559             for a in self._entries:
560                 self._entries[a].invalidate()
561         except Exception:
562             _logger.exception()
563
564
565 class TagsDirectory(RecursiveInvalidateDirectory):
566     """A special directory that contains as subdirectories all tags visible to the user."""
567
568     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
569         super(TagsDirectory, self).__init__(parent_inode, inodes)
570         self.api = api
571         self.num_retries = num_retries
572         self._poll = True
573         self._poll_time = poll_time
574
575     @use_counter
576     def update(self):
577         with llfuse.lock_released:
578             tags = self.api.links().list(
579                 filters=[['link_class', '=', 'tag']],
580                 select=['name'], distinct=True
581                 ).execute(num_retries=self.num_retries)
582         if "items" in tags:
583             self.merge(tags['items'],
584                        lambda i: i['name'],
585                        lambda a, i: a.tag == i['name'],
586                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
587
588
589 class TagDirectory(Directory):
590     """A special directory that contains as subdirectories all collections visible
591     to the user that are tagged with a particular tag.
592     """
593
594     def __init__(self, parent_inode, inodes, api, num_retries, tag,
595                  poll=False, poll_time=60):
596         super(TagDirectory, self).__init__(parent_inode, inodes)
597         self.api = api
598         self.num_retries = num_retries
599         self.tag = tag
600         self._poll = poll
601         self._poll_time = poll_time
602
603     @use_counter
604     def update(self):
605         with llfuse.lock_released:
606             taggedcollections = self.api.links().list(
607                 filters=[['link_class', '=', 'tag'],
608                          ['name', '=', self.tag],
609                          ['head_uuid', 'is_a', 'arvados#collection']],
610                 select=['head_uuid']
611                 ).execute(num_retries=self.num_retries)
612         self.merge(taggedcollections['items'],
613                    lambda i: i['head_uuid'],
614                    lambda a, i: a.collection_locator == i['head_uuid'],
615                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
616
617
618 class ProjectDirectory(Directory):
619     """A special directory that contains the contents of a project."""
620
621     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
622                  poll=False, poll_time=60):
623         super(ProjectDirectory, self).__init__(parent_inode, inodes)
624         self.api = api
625         self.num_retries = num_retries
626         self.project_object = project_object
627         self.project_object_file = None
628         self.project_uuid = project_object['uuid']
629         self._poll = poll
630         self._poll_time = poll_time
631         self._updating_lock = threading.Lock()
632         self._current_user = None
633
634     def createDirectory(self, i):
635         if collection_uuid_pattern.match(i['uuid']):
636             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
637         elif group_uuid_pattern.match(i['uuid']):
638             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
639         elif link_uuid_pattern.match(i['uuid']):
640             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
641                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
642             else:
643                 return None
644         elif uuid_pattern.match(i['uuid']):
645             return ObjectFile(self.parent_inode, i)
646         else:
647             return None
648
649     def uuid(self):
650         return self.project_uuid
651
652     @use_counter
653     def update(self):
654         if self.project_object_file == None:
655             self.project_object_file = ObjectFile(self.inode, self.project_object)
656             self.inodes.add_entry(self.project_object_file)
657
658         def namefn(i):
659             if 'name' in i:
660                 if i['name'] is None or len(i['name']) == 0:
661                     return None
662                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
663                     # collection or subproject
664                     return i['name']
665                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
666                     # name link
667                     return i['name']
668                 elif 'kind' in i and i['kind'].startswith('arvados#'):
669                     # something else
670                     return "{}.{}".format(i['name'], i['kind'][8:])
671             else:
672                 return None
673
674         def samefn(a, i):
675             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
676                 return a.uuid() == i['uuid']
677             elif isinstance(a, ObjectFile):
678                 return a.uuid() == i['uuid'] and not a.stale()
679             return False
680
681         try:
682             with llfuse.lock_released:
683                 self._updating_lock.acquire()
684                 if not self.stale():
685                     return
686
687                 if group_uuid_pattern.match(self.project_uuid):
688                     self.project_object = self.api.groups().get(
689                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
690                 elif user_uuid_pattern.match(self.project_uuid):
691                     self.project_object = self.api.users().get(
692                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
693
694                 contents = arvados.util.list_all(self.api.groups().contents,
695                                                  self.num_retries, uuid=self.project_uuid)
696
697             # end with llfuse.lock_released, re-acquire lock
698
699             self.merge(contents,
700                        namefn,
701                        samefn,
702                        self.createDirectory)
703         finally:
704             self._updating_lock.release()
705
706     @use_counter
707     @check_update
708     def __getitem__(self, item):
709         if item == '.arvados#project':
710             return self.project_object_file
711         else:
712             return super(ProjectDirectory, self).__getitem__(item)
713
714     def __contains__(self, k):
715         if k == '.arvados#project':
716             return True
717         else:
718             return super(ProjectDirectory, self).__contains__(k)
719
720     @use_counter
721     @check_update
722     def writable(self):
723         with llfuse.lock_released:
724             if not self._current_user:
725                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
726             return self._current_user["uuid"] in self.project_object["writable_by"]
727
728     def persisted(self):
729         return True
730
731     @use_counter
732     @check_update
733     def mkdir(self, name):
734         try:
735             with llfuse.lock_released:
736                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
737                                                     "name": name,
738                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
739             self.invalidate()
740         except apiclient_errors.Error as error:
741             _logger.error(error)
742             raise llfuse.FUSEError(errno.EEXIST)
743
744     @use_counter
745     @check_update
746     def rmdir(self, name):
747         if name not in self:
748             raise llfuse.FUSEError(errno.ENOENT)
749         if not isinstance(self[name], CollectionDirectory):
750             raise llfuse.FUSEError(errno.EPERM)
751         if len(self[name]) > 0:
752             raise llfuse.FUSEError(errno.ENOTEMPTY)
753         with llfuse.lock_released:
754             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
755         self.invalidate()
756
757     @use_counter
758     @check_update
759     def rename(self, name_old, name_new, src):
760         if not isinstance(src, ProjectDirectory):
761             raise llfuse.FUSEError(errno.EPERM)
762
763         ent = src[name_old]
764
765         if not isinstance(ent, CollectionDirectory):
766             raise llfuse.FUSEError(errno.EPERM)
767
768         if name_new in self:
769             # POSIX semantics for replacing one directory with another is
770             # tricky (the target directory must be empty, the operation must be
771             # atomic which isn't possible with the Arvados API as of this
772             # writing) so don't support that.
773             raise llfuse.FUSEError(errno.EPERM)
774
775         self.api.collections().update(uuid=ent.uuid(),
776                                       body={"owner_uuid": self.uuid(),
777                                             "name": name_new}).execute(num_retries=self.num_retries)
778
779         # Acually move the entry from source directory to this directory.
780         del src._entries[name_old]
781         self._entries[name_new] = ent
782         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
783
784
785 class SharedDirectory(Directory):
786     """A special directory that represents users or groups who have shared projects with me."""
787
788     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
789                  poll=False, poll_time=60):
790         super(SharedDirectory, self).__init__(parent_inode, inodes)
791         self.api = api
792         self.num_retries = num_retries
793         self.current_user = api.users().current().execute(num_retries=num_retries)
794         self._poll = True
795         self._poll_time = poll_time
796
797     @use_counter
798     def update(self):
799         with llfuse.lock_released:
800             all_projects = arvados.util.list_all(
801                 self.api.groups().list, self.num_retries,
802                 filters=[['group_class','=','project']])
803             objects = {}
804             for ob in all_projects:
805                 objects[ob['uuid']] = ob
806
807             roots = []
808             root_owners = {}
809             for ob in all_projects:
810                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
811                     roots.append(ob)
812                     root_owners[ob['owner_uuid']] = True
813
814             lusers = arvados.util.list_all(
815                 self.api.users().list, self.num_retries,
816                 filters=[['uuid','in', list(root_owners)]])
817             lgroups = arvados.util.list_all(
818                 self.api.groups().list, self.num_retries,
819                 filters=[['uuid','in', list(root_owners)]])
820
821             users = {}
822             groups = {}
823
824             for l in lusers:
825                 objects[l["uuid"]] = l
826             for l in lgroups:
827                 objects[l["uuid"]] = l
828
829             contents = {}
830             for r in root_owners:
831                 if r in objects:
832                     obr = objects[r]
833                     if obr.get("name"):
834                         contents[obr["name"]] = obr
835                     #elif obr.get("username"):
836                     #    contents[obr["username"]] = obr
837                     elif "first_name" in obr:
838                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
839
840
841             for r in roots:
842                 if r['owner_uuid'] not in objects:
843                     contents[r['name']] = r
844
845         # end with llfuse.lock_released, re-acquire lock
846
847         try:
848             self.merge(contents.items(),
849                        lambda i: i[0],
850                        lambda a, i: a.uuid() == i[1]['uuid'],
851                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
852         except Exception:
853             _logger.exception()