5824: Merge branch 'master' into 5824-keep-web-workbench
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8 import threading
9 from apiclient import errors as apiclient_errors
10 import errno
11
12 from fusefile import StringFile, ObjectFile, FuseArvadosFile
13 from fresh import FreshBase, convertTime, use_counter, check_update
14
15 import arvados.collection
16 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
17
18 _logger = logging.getLogger('arvados.arvados_fuse')
19
20
21 # Match any character which FUSE or Linux cannot accommodate as part
22 # of a filename. (If present in a collection filename, they will
23 # appear as underscores in the fuse mount.)
24 _disallowed_filename_characters = re.compile('[\x00/]')
25
26 # '.' and '..' are not reachable if API server is newer than #6277
27 def sanitize_filename(dirty):
28     """Replace disallowed filename characters with harmless "_"."""
29     if dirty is None:
30         return None
31     elif dirty == '':
32         return '_'
33     elif dirty == '.':
34         return '_'
35     elif dirty == '..':
36         return '__'
37     else:
38         return _disallowed_filename_characters.sub('_', dirty)
39
40
41 class Directory(FreshBase):
42     """Generic directory object, backed by a dict.
43
44     Consists of a set of entries with the key representing the filename
45     and the value referencing a File or Directory object.
46     """
47
48     def __init__(self, parent_inode, inodes):
49         """parent_inode is the integer inode number"""
50
51         super(Directory, self).__init__()
52
53         self.inode = None
54         if not isinstance(parent_inode, int):
55             raise Exception("parent_inode should be an int")
56         self.parent_inode = parent_inode
57         self.inodes = inodes
58         self._entries = {}
59         self._mtime = time.time()
60
61     #  Overriden by subclasses to implement logic to update the entries dict
62     #  when the directory is stale
63     @use_counter
64     def update(self):
65         pass
66
67     # Only used when computing the size of the disk footprint of the directory
68     # (stub)
69     def size(self):
70         return 0
71
72     def persisted(self):
73         return False
74
75     def checkupdate(self):
76         if self.stale():
77             try:
78                 self.update()
79             except apiclient.errors.HttpError as e:
80                 _logger.warn(e)
81
82     @use_counter
83     @check_update
84     def __getitem__(self, item):
85         return self._entries[item]
86
87     @use_counter
88     @check_update
89     def items(self):
90         return list(self._entries.items())
91
92     @use_counter
93     @check_update
94     def __contains__(self, k):
95         return k in self._entries
96
97     @use_counter
98     @check_update
99     def __len__(self):
100         return len(self._entries)
101
102     def fresh(self):
103         self.inodes.touch(self)
104         super(Directory, self).fresh()
105
106     def merge(self, items, fn, same, new_entry):
107         """Helper method for updating the contents of the directory.
108
109         Takes a list describing the new contents of the directory, reuse
110         entries that are the same in both the old and new lists, create new
111         entries, and delete old entries missing from the new list.
112
113         :items: iterable with new directory contents
114
115         :fn: function to take an entry in 'items' and return the desired file or
116         directory name, or None if this entry should be skipped
117
118         :same: function to compare an existing entry (a File or Directory
119         object) with an entry in the items list to determine whether to keep
120         the existing entry.
121
122         :new_entry: function to create a new directory entry (File or Directory
123         object) from an entry in the items list.
124
125         """
126
127         oldentries = self._entries
128         self._entries = {}
129         changed = False
130         for i in items:
131             name = sanitize_filename(fn(i))
132             if name:
133                 if name in oldentries and same(oldentries[name], i):
134                     # move existing directory entry over
135                     self._entries[name] = oldentries[name]
136                     del oldentries[name]
137                 else:
138                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
139                     # create new directory entry
140                     ent = new_entry(i)
141                     if ent is not None:
142                         self._entries[name] = self.inodes.add_entry(ent)
143                         changed = True
144
145         # delete any other directory entries that were not in found in 'items'
146         for i in oldentries:
147             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
148             self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
149             self.inodes.del_entry(oldentries[i])
150             changed = True
151
152         if changed:
153             self.inodes.invalidate_inode(self.inode)
154             self._mtime = time.time()
155
156         self.fresh()
157
158     def clear(self, force=False):
159         """Delete all entries"""
160
161         if not self.in_use() or force:
162             oldentries = self._entries
163             self._entries = {}
164             for n in oldentries:
165                 if not oldentries[n].clear(force):
166                     self._entries = oldentries
167                     return False
168             for n in oldentries:
169                 self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
170                 self.inodes.del_entry(oldentries[n])
171             self.inodes.invalidate_inode(self.inode)
172             self.invalidate()
173             return True
174         else:
175             return False
176
177     def mtime(self):
178         return self._mtime
179
180     def writable(self):
181         return False
182
183     def flush(self):
184         pass
185
186     def create(self, name):
187         raise NotImplementedError()
188
189     def mkdir(self, name):
190         raise NotImplementedError()
191
192     def unlink(self, name):
193         raise NotImplementedError()
194
195     def rmdir(self, name):
196         raise NotImplementedError()
197
198     def rename(self, name_old, name_new, src):
199         raise NotImplementedError()
200
201
202 class CollectionDirectoryBase(Directory):
203     """Represent an Arvados Collection as a directory.
204
205     This class is used for Subcollections, and is also the base class for
206     CollectionDirectory, which implements collection loading/saving on
207     Collection records.
208
209     Most operations act only the underlying Arvados `Collection` object.  The
210     `Collection` object signals via a notify callback to
211     `CollectionDirectoryBase.on_event` that an item was added, removed or
212     modified.  FUSE inodes and directory entries are created, deleted or
213     invalidated in response to these events.
214
215     """
216
217     def __init__(self, parent_inode, inodes, collection):
218         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
219         self.collection = collection
220
221     def new_entry(self, name, item, mtime):
222         name = sanitize_filename(name)
223         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
224             if item.fuse_entry.dead is not True:
225                 raise Exception("Can only reparent dead inode entry")
226             if item.fuse_entry.inode is None:
227                 raise Exception("Reparented entry must still have valid inode")
228             item.fuse_entry.dead = False
229             self._entries[name] = item.fuse_entry
230         elif isinstance(item, arvados.collection.RichCollectionBase):
231             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
232             self._entries[name].populate(mtime)
233         else:
234             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
235         item.fuse_entry = self._entries[name]
236
237     def on_event(self, event, collection, name, item):
238         if collection == self.collection:
239             name = sanitize_filename(name)
240             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
241             with llfuse.lock:
242                 if event == arvados.collection.ADD:
243                     self.new_entry(name, item, self.mtime())
244                 elif event == arvados.collection.DEL:
245                     ent = self._entries[name]
246                     del self._entries[name]
247                     self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
248                     self.inodes.del_entry(ent)
249                 elif event == arvados.collection.MOD:
250                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
251                         self.inodes.invalidate_inode(item.fuse_entry.inode)
252                     elif name in self._entries:
253                         self.inodes.invalidate_inode(self._entries[name].inode)
254
255     def populate(self, mtime):
256         self._mtime = mtime
257         self.collection.subscribe(self.on_event)
258         for entry, item in self.collection.items():
259             self.new_entry(entry, item, self.mtime())
260
261     def writable(self):
262         return self.collection.writable()
263
264     @use_counter
265     def flush(self):
266         with llfuse.lock_released:
267             self.collection.root_collection().save()
268
269     @use_counter
270     @check_update
271     def create(self, name):
272         with llfuse.lock_released:
273             self.collection.open(name, "w").close()
274
275     @use_counter
276     @check_update
277     def mkdir(self, name):
278         with llfuse.lock_released:
279             self.collection.mkdirs(name)
280
281     @use_counter
282     @check_update
283     def unlink(self, name):
284         with llfuse.lock_released:
285             self.collection.remove(name)
286         self.flush()
287
288     @use_counter
289     @check_update
290     def rmdir(self, name):
291         with llfuse.lock_released:
292             self.collection.remove(name)
293         self.flush()
294
295     @use_counter
296     @check_update
297     def rename(self, name_old, name_new, src):
298         if not isinstance(src, CollectionDirectoryBase):
299             raise llfuse.FUSEError(errno.EPERM)
300
301         if name_new in self:
302             ent = src[name_old]
303             tgt = self[name_new]
304             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
305                 pass
306             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
307                 if len(tgt) > 0:
308                     raise llfuse.FUSEError(errno.ENOTEMPTY)
309             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
310                 raise llfuse.FUSEError(errno.ENOTDIR)
311             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
312                 raise llfuse.FUSEError(errno.EISDIR)
313
314         with llfuse.lock_released:
315             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
316         self.flush()
317         src.flush()
318
319
320 class CollectionDirectory(CollectionDirectoryBase):
321     """Represents the root of a directory tree representing a collection."""
322
323     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
324         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
325         self.api = api
326         self.num_retries = num_retries
327         self.collection_record_file = None
328         self.collection_record = None
329         if isinstance(collection_record, dict):
330             self.collection_locator = collection_record['uuid']
331             self._mtime = convertTime(collection_record.get('modified_at'))
332         else:
333             self.collection_locator = collection_record
334             self._mtime = 0
335         self._manifest_size = 0
336         if self.collection_locator:
337             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
338         self._updating_lock = threading.Lock()
339
340     def same(self, i):
341         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
342
343     def writable(self):
344         return self.collection.writable() if self.collection is not None else self._writable
345
346     # Used by arv-web.py to switch the contents of the CollectionDirectory
347     def change_collection(self, new_locator):
348         """Switch the contents of the CollectionDirectory.
349
350         Must be called with llfuse.lock held.
351         """
352
353         self.collection_locator = new_locator
354         self.collection_record = None
355         self.update()
356
357     def new_collection(self, new_collection_record, coll_reader):
358         if self.inode:
359             self.clear(force=True)
360
361         self.collection_record = new_collection_record
362
363         if self.collection_record:
364             self._mtime = convertTime(self.collection_record.get('modified_at'))
365             self.collection_locator = self.collection_record["uuid"]
366             if self.collection_record_file is not None:
367                 self.collection_record_file.update(self.collection_record)
368
369         self.collection = coll_reader
370         self.populate(self.mtime())
371
372     def uuid(self):
373         return self.collection_locator
374
375     @use_counter
376     def update(self, to_record_version=None):
377         try:
378             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
379                 return True
380
381             if self.collection_locator is None:
382                 self.fresh()
383                 return True
384
385             try:
386                 with llfuse.lock_released:
387                     self._updating_lock.acquire()
388                     if not self.stale():
389                         return
390
391                     _logger.debug("Updating %s", to_record_version)
392                     if self.collection is not None:
393                         if self.collection.known_past_version(to_record_version):
394                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
395                         else:
396                             self.collection.update()
397                     else:
398                         if uuid_pattern.match(self.collection_locator):
399                             coll_reader = arvados.collection.Collection(
400                                 self.collection_locator, self.api, self.api.keep,
401                                 num_retries=self.num_retries)
402                         else:
403                             coll_reader = arvados.collection.CollectionReader(
404                                 self.collection_locator, self.api, self.api.keep,
405                                 num_retries=self.num_retries)
406                         new_collection_record = coll_reader.api_response() or {}
407                         # If the Collection only exists in Keep, there will be no API
408                         # response.  Fill in the fields we need.
409                         if 'uuid' not in new_collection_record:
410                             new_collection_record['uuid'] = self.collection_locator
411                         if "portable_data_hash" not in new_collection_record:
412                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
413                         if 'manifest_text' not in new_collection_record:
414                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
415
416                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
417                             self.new_collection(new_collection_record, coll_reader)
418
419                         self._manifest_size = len(coll_reader.manifest_text())
420                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
421                 # end with llfuse.lock_released, re-acquire lock
422
423                 self.fresh()
424                 return True
425             finally:
426                 self._updating_lock.release()
427         except arvados.errors.NotFoundError as e:
428             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
429         except arvados.errors.ArgumentError as detail:
430             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
431             if self.collection_record is not None and "manifest_text" in self.collection_record:
432                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
433         except Exception:
434             _logger.exception("arv-mount %s: error", self.collection_locator)
435             if self.collection_record is not None and "manifest_text" in self.collection_record:
436                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
437         return False
438
439     @use_counter
440     @check_update
441     def __getitem__(self, item):
442         if item == '.arvados#collection':
443             if self.collection_record_file is None:
444                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
445                 self.inodes.add_entry(self.collection_record_file)
446             return self.collection_record_file
447         else:
448             return super(CollectionDirectory, self).__getitem__(item)
449
450     def __contains__(self, k):
451         if k == '.arvados#collection':
452             return True
453         else:
454             return super(CollectionDirectory, self).__contains__(k)
455
456     def invalidate(self):
457         self.collection_record = None
458         self.collection_record_file = None
459         super(CollectionDirectory, self).invalidate()
460
461     def persisted(self):
462         return (self.collection_locator is not None)
463
464     def objsize(self):
465         # This is an empirically-derived heuristic to estimate the memory used
466         # to store this collection's metadata.  Calculating the memory
467         # footprint directly would be more accurate, but also more complicated.
468         return self._manifest_size * 128
469
470     def finalize(self):
471         if self.collection is not None:
472             if self.writable():
473                 self.collection.save()
474             self.collection.stop_threads()
475
476
477 class MagicDirectory(Directory):
478     """A special directory that logically contains the set of all extant keep locators.
479
480     When a file is referenced by lookup(), it is tested to see if it is a valid
481     keep locator to a manifest, and if so, loads the manifest contents as a
482     subdirectory of this directory with the locator as the directory name.
483     Since querying a list of all extant keep locators is impractical, only
484     collections that have already been accessed are visible to readdir().
485
486     """
487
488     README_TEXT = """
489 This directory provides access to Arvados collections as subdirectories listed
490 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
491 the form '1234567890abcdefghijklmnopqrstuv+123').
492
493 Note that this directory will appear empty until you attempt to access a
494 specific collection subdirectory (such as trying to 'cd' into it), at which
495 point the collection will actually be looked up on the server and the directory
496 will appear if it exists.
497 """.lstrip()
498
499     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
500         super(MagicDirectory, self).__init__(parent_inode, inodes)
501         self.api = api
502         self.num_retries = num_retries
503         self.pdh_only = pdh_only
504
505     def __setattr__(self, name, value):
506         super(MagicDirectory, self).__setattr__(name, value)
507         # When we're assigned an inode, add a README.
508         if ((name == 'inode') and (self.inode is not None) and
509               (not self._entries)):
510             self._entries['README'] = self.inodes.add_entry(
511                 StringFile(self.inode, self.README_TEXT, time.time()))
512             # If we're the root directory, add an identical by_id subdirectory.
513             if self.inode == llfuse.ROOT_INODE:
514                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
515                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
516
517     def __contains__(self, k):
518         if k in self._entries:
519             return True
520
521         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
522             return False
523
524         try:
525             e = self.inodes.add_entry(CollectionDirectory(
526                     self.inode, self.inodes, self.api, self.num_retries, k))
527
528             if e.update():
529                 if k not in self._entries:
530                     self._entries[k] = e
531                 else:
532                     self.inodes.del_entry(e)
533                 return True
534             else:
535                 self.inodes.del_entry(e)
536                 return False
537         except Exception as e:
538             _logger.debug('arv-mount exception keep %s', e)
539             self.inodes.del_entry(e)
540             return False
541
542     def __getitem__(self, item):
543         if item in self:
544             return self._entries[item]
545         else:
546             raise KeyError("No collection with id " + item)
547
548     def clear(self, force=False):
549         pass
550
551
552 class RecursiveInvalidateDirectory(Directory):
553     def invalidate(self):
554         try:
555             super(RecursiveInvalidateDirectory, self).invalidate()
556             for a in self._entries:
557                 self._entries[a].invalidate()
558         except Exception:
559             _logger.exception()
560
561
562 class TagsDirectory(RecursiveInvalidateDirectory):
563     """A special directory that contains as subdirectories all tags visible to the user."""
564
565     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
566         super(TagsDirectory, self).__init__(parent_inode, inodes)
567         self.api = api
568         self.num_retries = num_retries
569         self._poll = True
570         self._poll_time = poll_time
571
572     @use_counter
573     def update(self):
574         with llfuse.lock_released:
575             tags = self.api.links().list(
576                 filters=[['link_class', '=', 'tag']],
577                 select=['name'], distinct=True
578                 ).execute(num_retries=self.num_retries)
579         if "items" in tags:
580             self.merge(tags['items'],
581                        lambda i: i['name'],
582                        lambda a, i: a.tag == i['name'],
583                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
584
585
586 class TagDirectory(Directory):
587     """A special directory that contains as subdirectories all collections visible
588     to the user that are tagged with a particular tag.
589     """
590
591     def __init__(self, parent_inode, inodes, api, num_retries, tag,
592                  poll=False, poll_time=60):
593         super(TagDirectory, self).__init__(parent_inode, inodes)
594         self.api = api
595         self.num_retries = num_retries
596         self.tag = tag
597         self._poll = poll
598         self._poll_time = poll_time
599
600     @use_counter
601     def update(self):
602         with llfuse.lock_released:
603             taggedcollections = self.api.links().list(
604                 filters=[['link_class', '=', 'tag'],
605                          ['name', '=', self.tag],
606                          ['head_uuid', 'is_a', 'arvados#collection']],
607                 select=['head_uuid']
608                 ).execute(num_retries=self.num_retries)
609         self.merge(taggedcollections['items'],
610                    lambda i: i['head_uuid'],
611                    lambda a, i: a.collection_locator == i['head_uuid'],
612                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
613
614
615 class ProjectDirectory(Directory):
616     """A special directory that contains the contents of a project."""
617
618     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
619                  poll=False, poll_time=60):
620         super(ProjectDirectory, self).__init__(parent_inode, inodes)
621         self.api = api
622         self.num_retries = num_retries
623         self.project_object = project_object
624         self.project_object_file = None
625         self.project_uuid = project_object['uuid']
626         self._poll = poll
627         self._poll_time = poll_time
628         self._updating_lock = threading.Lock()
629         self._current_user = None
630
631     def createDirectory(self, i):
632         if collection_uuid_pattern.match(i['uuid']):
633             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
634         elif group_uuid_pattern.match(i['uuid']):
635             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
636         elif link_uuid_pattern.match(i['uuid']):
637             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
638                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
639             else:
640                 return None
641         elif uuid_pattern.match(i['uuid']):
642             return ObjectFile(self.parent_inode, i)
643         else:
644             return None
645
646     def uuid(self):
647         return self.project_uuid
648
649     @use_counter
650     def update(self):
651         if self.project_object_file == None:
652             self.project_object_file = ObjectFile(self.inode, self.project_object)
653             self.inodes.add_entry(self.project_object_file)
654
655         def namefn(i):
656             if 'name' in i:
657                 if i['name'] is None or len(i['name']) == 0:
658                     return None
659                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
660                     # collection or subproject
661                     return i['name']
662                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
663                     # name link
664                     return i['name']
665                 elif 'kind' in i and i['kind'].startswith('arvados#'):
666                     # something else
667                     return "{}.{}".format(i['name'], i['kind'][8:])
668             else:
669                 return None
670
671         def samefn(a, i):
672             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
673                 return a.uuid() == i['uuid']
674             elif isinstance(a, ObjectFile):
675                 return a.uuid() == i['uuid'] and not a.stale()
676             return False
677
678         try:
679             with llfuse.lock_released:
680                 self._updating_lock.acquire()
681                 if not self.stale():
682                     return
683
684                 if group_uuid_pattern.match(self.project_uuid):
685                     self.project_object = self.api.groups().get(
686                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
687                 elif user_uuid_pattern.match(self.project_uuid):
688                     self.project_object = self.api.users().get(
689                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
690
691                 contents = arvados.util.list_all(self.api.groups().contents,
692                                                  self.num_retries, uuid=self.project_uuid)
693
694             # end with llfuse.lock_released, re-acquire lock
695
696             self.merge(contents,
697                        namefn,
698                        samefn,
699                        self.createDirectory)
700         finally:
701             self._updating_lock.release()
702
703     @use_counter
704     @check_update
705     def __getitem__(self, item):
706         if item == '.arvados#project':
707             return self.project_object_file
708         else:
709             return super(ProjectDirectory, self).__getitem__(item)
710
711     def __contains__(self, k):
712         if k == '.arvados#project':
713             return True
714         else:
715             return super(ProjectDirectory, self).__contains__(k)
716
717     @use_counter
718     @check_update
719     def writable(self):
720         with llfuse.lock_released:
721             if not self._current_user:
722                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
723             return self._current_user["uuid"] in self.project_object["writable_by"]
724
725     def persisted(self):
726         return True
727
728     @use_counter
729     @check_update
730     def mkdir(self, name):
731         try:
732             with llfuse.lock_released:
733                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
734                                                     "name": name,
735                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
736             self.invalidate()
737         except apiclient_errors.Error as error:
738             _logger.error(error)
739             raise llfuse.FUSEError(errno.EEXIST)
740
741     @use_counter
742     @check_update
743     def rmdir(self, name):
744         if name not in self:
745             raise llfuse.FUSEError(errno.ENOENT)
746         if not isinstance(self[name], CollectionDirectory):
747             raise llfuse.FUSEError(errno.EPERM)
748         if len(self[name]) > 0:
749             raise llfuse.FUSEError(errno.ENOTEMPTY)
750         with llfuse.lock_released:
751             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
752         self.invalidate()
753
754     @use_counter
755     @check_update
756     def rename(self, name_old, name_new, src):
757         if not isinstance(src, ProjectDirectory):
758             raise llfuse.FUSEError(errno.EPERM)
759
760         ent = src[name_old]
761
762         if not isinstance(ent, CollectionDirectory):
763             raise llfuse.FUSEError(errno.EPERM)
764
765         if name_new in self:
766             # POSIX semantics for replacing one directory with another is
767             # tricky (the target directory must be empty, the operation must be
768             # atomic which isn't possible with the Arvados API as of this
769             # writing) so don't support that.
770             raise llfuse.FUSEError(errno.EPERM)
771
772         self.api.collections().update(uuid=ent.uuid(),
773                                       body={"owner_uuid": self.uuid(),
774                                             "name": name_new}).execute(num_retries=self.num_retries)
775
776         # Acually move the entry from source directory to this directory.
777         del src._entries[name_old]
778         self._entries[name_new] = ent
779         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
780
781
782 class SharedDirectory(Directory):
783     """A special directory that represents users or groups who have shared projects with me."""
784
785     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
786                  poll=False, poll_time=60):
787         super(SharedDirectory, self).__init__(parent_inode, inodes)
788         self.api = api
789         self.num_retries = num_retries
790         self.current_user = api.users().current().execute(num_retries=num_retries)
791         self._poll = True
792         self._poll_time = poll_time
793
794     @use_counter
795     def update(self):
796         with llfuse.lock_released:
797             all_projects = arvados.util.list_all(
798                 self.api.groups().list, self.num_retries,
799                 filters=[['group_class','=','project']])
800             objects = {}
801             for ob in all_projects:
802                 objects[ob['uuid']] = ob
803
804             roots = []
805             root_owners = {}
806             for ob in all_projects:
807                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
808                     roots.append(ob)
809                     root_owners[ob['owner_uuid']] = True
810
811             lusers = arvados.util.list_all(
812                 self.api.users().list, self.num_retries,
813                 filters=[['uuid','in', list(root_owners)]])
814             lgroups = arvados.util.list_all(
815                 self.api.groups().list, self.num_retries,
816                 filters=[['uuid','in', list(root_owners)]])
817
818             users = {}
819             groups = {}
820
821             for l in lusers:
822                 objects[l["uuid"]] = l
823             for l in lgroups:
824                 objects[l["uuid"]] = l
825
826             contents = {}
827             for r in root_owners:
828                 if r in objects:
829                     obr = objects[r]
830                     if obr.get("name"):
831                         contents[obr["name"]] = obr
832                     #elif obr.get("username"):
833                     #    contents[obr["username"]] = obr
834                     elif "first_name" in obr:
835                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
836
837
838             for r in roots:
839                 if r['owner_uuid'] not in objects:
840                     contents[r['name']] = r
841
842         # end with llfuse.lock_released, re-acquire lock
843
844         try:
845             self.merge(contents.items(),
846                        lambda i: i[0],
847                        lambda a, i: a.uuid() == i[1]['uuid'],
848                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
849         except Exception:
850             _logger.exception()