Merge branch '6643-fuse-del-crash' closes #6643
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8 import threading
9 from apiclient import errors as apiclient_errors
10 import errno
11
12 from fusefile import StringFile, ObjectFile, FuseArvadosFile
13 from fresh import FreshBase, convertTime, use_counter, check_update
14
15 import arvados.collection
16 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
17
18 _logger = logging.getLogger('arvados.arvados_fuse')
19
20
21 # Match any character which FUSE or Linux cannot accommodate as part
22 # of a filename. (If present in a collection filename, they will
23 # appear as underscores in the fuse mount.)
24 _disallowed_filename_characters = re.compile('[\x00/]')
25
26 def sanitize_filename(dirty):
27     """Replace disallowed filename characters with harmless "_"."""
28     if dirty is None:
29         return None
30     elif dirty == '':
31         return '_'
32     elif dirty == '.':
33         return '_'
34     elif dirty == '..':
35         return '__'
36     else:
37         return _disallowed_filename_characters.sub('_', dirty)
38
39
40 class Directory(FreshBase):
41     """Generic directory object, backed by a dict.
42
43     Consists of a set of entries with the key representing the filename
44     and the value referencing a File or Directory object.
45     """
46
47     def __init__(self, parent_inode, inodes):
48         """parent_inode is the integer inode number"""
49
50         super(Directory, self).__init__()
51
52         self.inode = None
53         if not isinstance(parent_inode, int):
54             raise Exception("parent_inode should be an int")
55         self.parent_inode = parent_inode
56         self.inodes = inodes
57         self._entries = {}
58         self._mtime = time.time()
59
60     #  Overriden by subclasses to implement logic to update the entries dict
61     #  when the directory is stale
62     @use_counter
63     def update(self):
64         pass
65
66     # Only used when computing the size of the disk footprint of the directory
67     # (stub)
68     def size(self):
69         return 0
70
71     def persisted(self):
72         return False
73
74     def checkupdate(self):
75         if self.stale():
76             try:
77                 self.update()
78             except apiclient.errors.HttpError as e:
79                 _logger.warn(e)
80
81     @use_counter
82     @check_update
83     def __getitem__(self, item):
84         return self._entries[item]
85
86     @use_counter
87     @check_update
88     def items(self):
89         return list(self._entries.items())
90
91     @use_counter
92     @check_update
93     def __contains__(self, k):
94         return k in self._entries
95
96     @use_counter
97     @check_update
98     def __len__(self):
99         return len(self._entries)
100
101     def fresh(self):
102         self.inodes.touch(self)
103         super(Directory, self).fresh()
104
105     def merge(self, items, fn, same, new_entry):
106         """Helper method for updating the contents of the directory.
107
108         Takes a list describing the new contents of the directory, reuse
109         entries that are the same in both the old and new lists, create new
110         entries, and delete old entries missing from the new list.
111
112         :items: iterable with new directory contents
113
114         :fn: function to take an entry in 'items' and return the desired file or
115         directory name, or None if this entry should be skipped
116
117         :same: function to compare an existing entry (a File or Directory
118         object) with an entry in the items list to determine whether to keep
119         the existing entry.
120
121         :new_entry: function to create a new directory entry (File or Directory
122         object) from an entry in the items list.
123
124         """
125
126         oldentries = self._entries
127         self._entries = {}
128         changed = False
129         for i in items:
130             name = sanitize_filename(fn(i))
131             if name:
132                 if name in oldentries and same(oldentries[name], i):
133                     # move existing directory entry over
134                     self._entries[name] = oldentries[name]
135                     del oldentries[name]
136                 else:
137                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
138                     # create new directory entry
139                     ent = new_entry(i)
140                     if ent is not None:
141                         self._entries[name] = self.inodes.add_entry(ent)
142                         changed = True
143
144         # delete any other directory entries that were not in found in 'items'
145         for i in oldentries:
146             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
147             self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
148             self.inodes.del_entry(oldentries[i])
149             changed = True
150
151         if changed:
152             self.inodes.invalidate_inode(self.inode)
153             self._mtime = time.time()
154
155         self.fresh()
156
157     def clear(self, force=False):
158         """Delete all entries"""
159
160         if not self.in_use() or force:
161             oldentries = self._entries
162             self._entries = {}
163             for n in oldentries:
164                 if not oldentries[n].clear(force):
165                     self._entries = oldentries
166                     return False
167             for n in oldentries:
168                 self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
169                 self.inodes.del_entry(oldentries[n])
170             self.inodes.invalidate_inode(self.inode)
171             self.invalidate()
172             return True
173         else:
174             return False
175
176     def mtime(self):
177         return self._mtime
178
179     def writable(self):
180         return False
181
182     def flush(self):
183         pass
184
185     def create(self, name):
186         raise NotImplementedError()
187
188     def mkdir(self, name):
189         raise NotImplementedError()
190
191     def unlink(self, name):
192         raise NotImplementedError()
193
194     def rmdir(self, name):
195         raise NotImplementedError()
196
197     def rename(self, name_old, name_new, src):
198         raise NotImplementedError()
199
200
201 class CollectionDirectoryBase(Directory):
202     """Represent an Arvados Collection as a directory.
203
204     This class is used for Subcollections, and is also the base class for
205     CollectionDirectory, which implements collection loading/saving on
206     Collection records.
207
208     Most operations act only the underlying Arvados `Collection` object.  The
209     `Collection` object signals via a notify callback to
210     `CollectionDirectoryBase.on_event` that an item was added, removed or
211     modified.  FUSE inodes and directory entries are created, deleted or
212     invalidated in response to these events.
213
214     """
215
216     def __init__(self, parent_inode, inodes, collection):
217         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
218         self.collection = collection
219
220     def new_entry(self, name, item, mtime):
221         name = sanitize_filename(name)
222         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
223             if item.fuse_entry.dead is not True:
224                 raise Exception("Can only reparent dead inode entry")
225             if item.fuse_entry.inode is None:
226                 raise Exception("Reparented entry must still have valid inode")
227             item.fuse_entry.dead = False
228             self._entries[name] = item.fuse_entry
229         elif isinstance(item, arvados.collection.RichCollectionBase):
230             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
231             self._entries[name].populate(mtime)
232         else:
233             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
234         item.fuse_entry = self._entries[name]
235
236     def on_event(self, event, collection, name, item):
237         if collection == self.collection:
238             name = sanitize_filename(name)
239             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
240             with llfuse.lock:
241                 if event == arvados.collection.ADD:
242                     self.new_entry(name, item, self.mtime())
243                 elif event == arvados.collection.DEL:
244                     ent = self._entries[name]
245                     del self._entries[name]
246                     self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
247                     self.inodes.del_entry(ent)
248                 elif event == arvados.collection.MOD:
249                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
250                         self.inodes.invalidate_inode(item.fuse_entry.inode)
251                     elif name in self._entries:
252                         self.inodes.invalidate_inode(self._entries[name].inode)
253
254     def populate(self, mtime):
255         self._mtime = mtime
256         self.collection.subscribe(self.on_event)
257         for entry, item in self.collection.items():
258             self.new_entry(entry, item, self.mtime())
259
260     def writable(self):
261         return self.collection.writable()
262
263     @use_counter
264     def flush(self):
265         with llfuse.lock_released:
266             self.collection.root_collection().save()
267
268     @use_counter
269     @check_update
270     def create(self, name):
271         with llfuse.lock_released:
272             self.collection.open(name, "w").close()
273
274     @use_counter
275     @check_update
276     def mkdir(self, name):
277         with llfuse.lock_released:
278             self.collection.mkdirs(name)
279
280     @use_counter
281     @check_update
282     def unlink(self, name):
283         with llfuse.lock_released:
284             self.collection.remove(name)
285         self.flush()
286
287     @use_counter
288     @check_update
289     def rmdir(self, name):
290         with llfuse.lock_released:
291             self.collection.remove(name)
292         self.flush()
293
294     @use_counter
295     @check_update
296     def rename(self, name_old, name_new, src):
297         if not isinstance(src, CollectionDirectoryBase):
298             raise llfuse.FUSEError(errno.EPERM)
299
300         if name_new in self:
301             ent = src[name_old]
302             tgt = self[name_new]
303             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
304                 pass
305             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
306                 if len(tgt) > 0:
307                     raise llfuse.FUSEError(errno.ENOTEMPTY)
308             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
309                 raise llfuse.FUSEError(errno.ENOTDIR)
310             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
311                 raise llfuse.FUSEError(errno.EISDIR)
312
313         with llfuse.lock_released:
314             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
315         self.flush()
316         src.flush()
317
318
319 class CollectionDirectory(CollectionDirectoryBase):
320     """Represents the root of a directory tree representing a collection."""
321
322     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
323         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
324         self.api = api
325         self.num_retries = num_retries
326         self.collection_record_file = None
327         self.collection_record = None
328         if isinstance(collection_record, dict):
329             self.collection_locator = collection_record['uuid']
330             self._mtime = convertTime(collection_record.get('modified_at'))
331         else:
332             self.collection_locator = collection_record
333             self._mtime = 0
334         self._manifest_size = 0
335         if self.collection_locator:
336             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
337         self._updating_lock = threading.Lock()
338
339     def same(self, i):
340         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
341
342     def writable(self):
343         return self.collection.writable() if self.collection is not None else self._writable
344
345     # Used by arv-web.py to switch the contents of the CollectionDirectory
346     def change_collection(self, new_locator):
347         """Switch the contents of the CollectionDirectory.
348
349         Must be called with llfuse.lock held.
350         """
351
352         self.collection_locator = new_locator
353         self.collection_record = None
354         self.update()
355
356     def new_collection(self, new_collection_record, coll_reader):
357         if self.inode:
358             self.clear(force=True)
359
360         self.collection_record = new_collection_record
361
362         if self.collection_record:
363             self._mtime = convertTime(self.collection_record.get('modified_at'))
364             self.collection_locator = self.collection_record["uuid"]
365             if self.collection_record_file is not None:
366                 self.collection_record_file.update(self.collection_record)
367
368         self.collection = coll_reader
369         self.populate(self.mtime())
370
371     def uuid(self):
372         return self.collection_locator
373
374     @use_counter
375     def update(self, to_record_version=None):
376         try:
377             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
378                 return True
379
380             if self.collection_locator is None:
381                 self.fresh()
382                 return True
383
384             try:
385                 with llfuse.lock_released:
386                     self._updating_lock.acquire()
387                     if not self.stale():
388                         return
389
390                     _logger.debug("Updating %s", to_record_version)
391                     if self.collection is not None:
392                         if self.collection.known_past_version(to_record_version):
393                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
394                         else:
395                             self.collection.update()
396                     else:
397                         if uuid_pattern.match(self.collection_locator):
398                             coll_reader = arvados.collection.Collection(
399                                 self.collection_locator, self.api, self.api.keep,
400                                 num_retries=self.num_retries)
401                         else:
402                             coll_reader = arvados.collection.CollectionReader(
403                                 self.collection_locator, self.api, self.api.keep,
404                                 num_retries=self.num_retries)
405                         new_collection_record = coll_reader.api_response() or {}
406                         # If the Collection only exists in Keep, there will be no API
407                         # response.  Fill in the fields we need.
408                         if 'uuid' not in new_collection_record:
409                             new_collection_record['uuid'] = self.collection_locator
410                         if "portable_data_hash" not in new_collection_record:
411                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
412                         if 'manifest_text' not in new_collection_record:
413                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
414
415                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
416                             self.new_collection(new_collection_record, coll_reader)
417
418                         self._manifest_size = len(coll_reader.manifest_text())
419                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
420                 # end with llfuse.lock_released, re-acquire lock
421
422                 self.fresh()
423                 return True
424             finally:
425                 self._updating_lock.release()
426         except arvados.errors.NotFoundError as e:
427             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
428         except arvados.errors.ArgumentError as detail:
429             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
430             if self.collection_record is not None and "manifest_text" in self.collection_record:
431                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
432         except Exception:
433             _logger.exception("arv-mount %s: error", self.collection_locator)
434             if self.collection_record is not None and "manifest_text" in self.collection_record:
435                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
436         return False
437
438     @use_counter
439     @check_update
440     def __getitem__(self, item):
441         if item == '.arvados#collection':
442             if self.collection_record_file is None:
443                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
444                 self.inodes.add_entry(self.collection_record_file)
445             return self.collection_record_file
446         else:
447             return super(CollectionDirectory, self).__getitem__(item)
448
449     def __contains__(self, k):
450         if k == '.arvados#collection':
451             return True
452         else:
453             return super(CollectionDirectory, self).__contains__(k)
454
455     def invalidate(self):
456         self.collection_record = None
457         self.collection_record_file = None
458         super(CollectionDirectory, self).invalidate()
459
460     def persisted(self):
461         return (self.collection_locator is not None)
462
463     def objsize(self):
464         # This is an empirically-derived heuristic to estimate the memory used
465         # to store this collection's metadata.  Calculating the memory
466         # footprint directly would be more accurate, but also more complicated.
467         return self._manifest_size * 128
468
469     def finalize(self):
470         if self.collection is not None:
471             if self.writable():
472                 self.collection.save()
473             self.collection.stop_threads()
474
475
476 class MagicDirectory(Directory):
477     """A special directory that logically contains the set of all extant keep locators.
478
479     When a file is referenced by lookup(), it is tested to see if it is a valid
480     keep locator to a manifest, and if so, loads the manifest contents as a
481     subdirectory of this directory with the locator as the directory name.
482     Since querying a list of all extant keep locators is impractical, only
483     collections that have already been accessed are visible to readdir().
484
485     """
486
487     README_TEXT = """
488 This directory provides access to Arvados collections as subdirectories listed
489 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
490 the form '1234567890abcdefghijklmnopqrstuv+123').
491
492 Note that this directory will appear empty until you attempt to access a
493 specific collection subdirectory (such as trying to 'cd' into it), at which
494 point the collection will actually be looked up on the server and the directory
495 will appear if it exists.
496 """.lstrip()
497
498     def __init__(self, parent_inode, inodes, api, num_retries):
499         super(MagicDirectory, self).__init__(parent_inode, inodes)
500         self.api = api
501         self.num_retries = num_retries
502
503     def __setattr__(self, name, value):
504         super(MagicDirectory, self).__setattr__(name, value)
505         # When we're assigned an inode, add a README.
506         if ((name == 'inode') and (self.inode is not None) and
507               (not self._entries)):
508             self._entries['README'] = self.inodes.add_entry(
509                 StringFile(self.inode, self.README_TEXT, time.time()))
510             # If we're the root directory, add an identical by_id subdirectory.
511             if self.inode == llfuse.ROOT_INODE:
512                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
513                         self.inode, self.inodes, self.api, self.num_retries))
514
515     def __contains__(self, k):
516         if k in self._entries:
517             return True
518
519         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
520             return False
521
522         try:
523             e = self.inodes.add_entry(CollectionDirectory(
524                     self.inode, self.inodes, self.api, self.num_retries, k))
525
526             if e.update():
527                 if k not in self._entries:
528                     self._entries[k] = e
529                 else:
530                     self.inodes.del_entry(e)
531                 return True
532             else:
533                 self.inodes.del_entry(e)
534                 return False
535         except Exception as e:
536             _logger.debug('arv-mount exception keep %s', e)
537             self.inodes.del_entry(e)
538             return False
539
540     def __getitem__(self, item):
541         if item in self:
542             return self._entries[item]
543         else:
544             raise KeyError("No collection with id " + item)
545
546     def clear(self, force=False):
547         pass
548
549
550 class RecursiveInvalidateDirectory(Directory):
551     def invalidate(self):
552         try:
553             super(RecursiveInvalidateDirectory, self).invalidate()
554             for a in self._entries:
555                 self._entries[a].invalidate()
556         except Exception:
557             _logger.exception()
558
559
560 class TagsDirectory(RecursiveInvalidateDirectory):
561     """A special directory that contains as subdirectories all tags visible to the user."""
562
563     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
564         super(TagsDirectory, self).__init__(parent_inode, inodes)
565         self.api = api
566         self.num_retries = num_retries
567         self._poll = True
568         self._poll_time = poll_time
569
570     @use_counter
571     def update(self):
572         with llfuse.lock_released:
573             tags = self.api.links().list(
574                 filters=[['link_class', '=', 'tag']],
575                 select=['name'], distinct=True
576                 ).execute(num_retries=self.num_retries)
577         if "items" in tags:
578             self.merge(tags['items'],
579                        lambda i: i['name'],
580                        lambda a, i: a.tag == i['name'],
581                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
582
583
584 class TagDirectory(Directory):
585     """A special directory that contains as subdirectories all collections visible
586     to the user that are tagged with a particular tag.
587     """
588
589     def __init__(self, parent_inode, inodes, api, num_retries, tag,
590                  poll=False, poll_time=60):
591         super(TagDirectory, self).__init__(parent_inode, inodes)
592         self.api = api
593         self.num_retries = num_retries
594         self.tag = tag
595         self._poll = poll
596         self._poll_time = poll_time
597
598     @use_counter
599     def update(self):
600         with llfuse.lock_released:
601             taggedcollections = self.api.links().list(
602                 filters=[['link_class', '=', 'tag'],
603                          ['name', '=', self.tag],
604                          ['head_uuid', 'is_a', 'arvados#collection']],
605                 select=['head_uuid']
606                 ).execute(num_retries=self.num_retries)
607         self.merge(taggedcollections['items'],
608                    lambda i: i['head_uuid'],
609                    lambda a, i: a.collection_locator == i['head_uuid'],
610                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
611
612
613 class ProjectDirectory(Directory):
614     """A special directory that contains the contents of a project."""
615
616     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
617                  poll=False, poll_time=60):
618         super(ProjectDirectory, self).__init__(parent_inode, inodes)
619         self.api = api
620         self.num_retries = num_retries
621         self.project_object = project_object
622         self.project_object_file = None
623         self.project_uuid = project_object['uuid']
624         self._poll = poll
625         self._poll_time = poll_time
626         self._updating_lock = threading.Lock()
627         self._current_user = None
628
629     def createDirectory(self, i):
630         if collection_uuid_pattern.match(i['uuid']):
631             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
632         elif group_uuid_pattern.match(i['uuid']):
633             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
634         elif link_uuid_pattern.match(i['uuid']):
635             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
636                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
637             else:
638                 return None
639         elif uuid_pattern.match(i['uuid']):
640             return ObjectFile(self.parent_inode, i)
641         else:
642             return None
643
644     def uuid(self):
645         return self.project_uuid
646
647     @use_counter
648     def update(self):
649         if self.project_object_file == None:
650             self.project_object_file = ObjectFile(self.inode, self.project_object)
651             self.inodes.add_entry(self.project_object_file)
652
653         def namefn(i):
654             if 'name' in i:
655                 if i['name'] is None or len(i['name']) == 0:
656                     return None
657                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
658                     # collection or subproject
659                     return i['name']
660                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
661                     # name link
662                     return i['name']
663                 elif 'kind' in i and i['kind'].startswith('arvados#'):
664                     # something else
665                     return "{}.{}".format(i['name'], i['kind'][8:])
666             else:
667                 return None
668
669         def samefn(a, i):
670             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
671                 return a.uuid() == i['uuid']
672             elif isinstance(a, ObjectFile):
673                 return a.uuid() == i['uuid'] and not a.stale()
674             return False
675
676         try:
677             with llfuse.lock_released:
678                 self._updating_lock.acquire()
679                 if not self.stale():
680                     return
681
682                 if group_uuid_pattern.match(self.project_uuid):
683                     self.project_object = self.api.groups().get(
684                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
685                 elif user_uuid_pattern.match(self.project_uuid):
686                     self.project_object = self.api.users().get(
687                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
688
689                 contents = arvados.util.list_all(self.api.groups().contents,
690                                                  self.num_retries, uuid=self.project_uuid)
691
692             # end with llfuse.lock_released, re-acquire lock
693
694             self.merge(contents,
695                        namefn,
696                        samefn,
697                        self.createDirectory)
698         finally:
699             self._updating_lock.release()
700
701     @use_counter
702     @check_update
703     def __getitem__(self, item):
704         if item == '.arvados#project':
705             return self.project_object_file
706         else:
707             return super(ProjectDirectory, self).__getitem__(item)
708
709     def __contains__(self, k):
710         if k == '.arvados#project':
711             return True
712         else:
713             return super(ProjectDirectory, self).__contains__(k)
714
715     @use_counter
716     @check_update
717     def writable(self):
718         with llfuse.lock_released:
719             if not self._current_user:
720                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
721             return self._current_user["uuid"] in self.project_object["writable_by"]
722
723     def persisted(self):
724         return True
725
726     @use_counter
727     @check_update
728     def mkdir(self, name):
729         try:
730             with llfuse.lock_released:
731                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
732                                                     "name": name,
733                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
734             self.invalidate()
735         except apiclient_errors.Error as error:
736             _logger.error(error)
737             raise llfuse.FUSEError(errno.EEXIST)
738
739     @use_counter
740     @check_update
741     def rmdir(self, name):
742         if name not in self:
743             raise llfuse.FUSEError(errno.ENOENT)
744         if not isinstance(self[name], CollectionDirectory):
745             raise llfuse.FUSEError(errno.EPERM)
746         if len(self[name]) > 0:
747             raise llfuse.FUSEError(errno.ENOTEMPTY)
748         with llfuse.lock_released:
749             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
750         self.invalidate()
751
752     @use_counter
753     @check_update
754     def rename(self, name_old, name_new, src):
755         if not isinstance(src, ProjectDirectory):
756             raise llfuse.FUSEError(errno.EPERM)
757
758         ent = src[name_old]
759
760         if not isinstance(ent, CollectionDirectory):
761             raise llfuse.FUSEError(errno.EPERM)
762
763         if name_new in self:
764             # POSIX semantics for replacing one directory with another is
765             # tricky (the target directory must be empty, the operation must be
766             # atomic which isn't possible with the Arvados API as of this
767             # writing) so don't support that.
768             raise llfuse.FUSEError(errno.EPERM)
769
770         self.api.collections().update(uuid=ent.uuid(),
771                                       body={"owner_uuid": self.uuid(),
772                                             "name": name_new}).execute(num_retries=self.num_retries)
773
774         # Acually move the entry from source directory to this directory.
775         del src._entries[name_old]
776         self._entries[name_new] = ent
777         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
778
779
780 class SharedDirectory(Directory):
781     """A special directory that represents users or groups who have shared projects with me."""
782
783     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
784                  poll=False, poll_time=60):
785         super(SharedDirectory, self).__init__(parent_inode, inodes)
786         self.api = api
787         self.num_retries = num_retries
788         self.current_user = api.users().current().execute(num_retries=num_retries)
789         self._poll = True
790         self._poll_time = poll_time
791
792     @use_counter
793     def update(self):
794         with llfuse.lock_released:
795             all_projects = arvados.util.list_all(
796                 self.api.groups().list, self.num_retries,
797                 filters=[['group_class','=','project']])
798             objects = {}
799             for ob in all_projects:
800                 objects[ob['uuid']] = ob
801
802             roots = []
803             root_owners = {}
804             for ob in all_projects:
805                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
806                     roots.append(ob)
807                     root_owners[ob['owner_uuid']] = True
808
809             lusers = arvados.util.list_all(
810                 self.api.users().list, self.num_retries,
811                 filters=[['uuid','in', list(root_owners)]])
812             lgroups = arvados.util.list_all(
813                 self.api.groups().list, self.num_retries,
814                 filters=[['uuid','in', list(root_owners)]])
815
816             users = {}
817             groups = {}
818
819             for l in lusers:
820                 objects[l["uuid"]] = l
821             for l in lgroups:
822                 objects[l["uuid"]] = l
823
824             contents = {}
825             for r in root_owners:
826                 if r in objects:
827                     obr = objects[r]
828                     if obr.get("name"):
829                         contents[obr["name"]] = obr
830                     #elif obr.get("username"):
831                     #    contents[obr["username"]] = obr
832                     elif "first_name" in obr:
833                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
834
835
836             for r in roots:
837                 if r['owner_uuid'] not in objects:
838                     contents[r['name']] = r
839
840         # end with llfuse.lock_released, re-acquire lock
841
842         try:
843             self.merge(contents.items(),
844                        lambda i: i[0],
845                        lambda a, i: a.uuid() == i[1]['uuid'],
846                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
847         except Exception:
848             _logger.exception()