3198: Enable support for event bus based updates in arv-mount.
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8 import threading
9 from apiclient import errors as apiclient_errors
10 import errno
11
12 from fusefile import StringFile, ObjectFile, FuseArvadosFile
13 from fresh import FreshBase, convertTime, use_counter, check_update
14
15 import arvados.collection
16 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
17
18 _logger = logging.getLogger('arvados.arvados_fuse')
19
20
21 # Match any character which FUSE or Linux cannot accommodate as part
22 # of a filename. (If present in a collection filename, they will
23 # appear as underscores in the fuse mount.)
24 _disallowed_filename_characters = re.compile('[\x00/]')
25
26 def sanitize_filename(dirty):
27     """Replace disallowed filename characters with harmless "_"."""
28     if dirty is None:
29         return None
30     elif dirty == '':
31         return '_'
32     elif dirty == '.':
33         return '_'
34     elif dirty == '..':
35         return '__'
36     else:
37         return _disallowed_filename_characters.sub('_', dirty)
38
39
40 class Directory(FreshBase):
41     """Generic directory object, backed by a dict.
42
43     Consists of a set of entries with the key representing the filename
44     and the value referencing a File or Directory object.
45     """
46
47     def __init__(self, parent_inode, inodes):
48         """parent_inode is the integer inode number"""
49
50         super(Directory, self).__init__()
51
52         self.inode = None
53         if not isinstance(parent_inode, int):
54             raise Exception("parent_inode should be an int")
55         self.parent_inode = parent_inode
56         self.inodes = inodes
57         self._entries = {}
58         self._mtime = time.time()
59
60     #  Overriden by subclasses to implement logic to update the entries dict
61     #  when the directory is stale
62     @use_counter
63     def update(self):
64         pass
65
66     # Only used when computing the size of the disk footprint of the directory
67     # (stub)
68     def size(self):
69         return 0
70
71     def persisted(self):
72         return False
73
74     def checkupdate(self):
75         if self.stale():
76             try:
77                 self.update()
78             except apiclient.errors.HttpError as e:
79                 _logger.warn(e)
80
81     @use_counter
82     @check_update
83     def __getitem__(self, item):
84         return self._entries[item]
85
86     @use_counter
87     @check_update
88     def items(self):
89         return list(self._entries.items())
90
91     @use_counter
92     @check_update
93     def __contains__(self, k):
94         return k in self._entries
95
96     @use_counter
97     @check_update
98     def __len__(self):
99         return len(self._entries)
100
101     def fresh(self):
102         self.inodes.touch(self)
103         super(Directory, self).fresh()
104
105     def merge(self, items, fn, same, new_entry):
106         """Helper method for updating the contents of the directory.
107
108         Takes a list describing the new contents of the directory, reuse
109         entries that are the same in both the old and new lists, create new
110         entries, and delete old entries missing from the new list.
111
112         :items: iterable with new directory contents
113
114         :fn: function to take an entry in 'items' and return the desired file or
115         directory name, or None if this entry should be skipped
116
117         :same: function to compare an existing entry (a File or Directory
118         object) with an entry in the items list to determine whether to keep
119         the existing entry.
120
121         :new_entry: function to create a new directory entry (File or Directory
122         object) from an entry in the items list.
123
124         """
125
126         oldentries = self._entries
127         self._entries = {}
128         changed = False
129         for i in items:
130             name = sanitize_filename(fn(i))
131             if name:
132                 if name in oldentries and same(oldentries[name], i):
133                     # move existing directory entry over
134                     self._entries[name] = oldentries[name]
135                     del oldentries[name]
136                 else:
137                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
138                     # create new directory entry
139                     ent = new_entry(i)
140                     if ent is not None:
141                         self._entries[name] = self.inodes.add_entry(ent)
142                         changed = True
143
144         # delete any other directory entries that were not in found in 'items'
145         for i in oldentries:
146             _logger.debug("Forgetting about entry '%s' on inode %i", str(i), self.inode)
147             llfuse.invalidate_entry(self.inode, str(i))
148             self.inodes.del_entry(oldentries[i])
149             changed = True
150
151         if changed:
152             llfuse.invalidate_inode(self.inode)
153             self._mtime = time.time()
154
155         self.fresh()
156
157     def clear(self, force=False):
158         """Delete all entries"""
159
160         if not self.in_use() or force:
161             oldentries = self._entries
162             self._entries = {}
163             for n in oldentries:
164                 if not oldentries[n].clear(force):
165                     self._entries = oldentries
166                     return False
167             for n in oldentries:
168                 llfuse.invalidate_entry(self.inode, str(n))
169                 self.inodes.del_entry(oldentries[n])
170             llfuse.invalidate_inode(self.inode)
171             self.invalidate()
172             return True
173         else:
174             return False
175
176     def mtime(self):
177         return self._mtime
178
179     def writable(self):
180         return False
181
182     def flush(self):
183         pass
184
185     def create(self, name):
186         raise NotImplementedError()
187
188     def mkdir(self, name):
189         raise NotImplementedError()
190
191     def unlink(self, name):
192         raise NotImplementedError()
193
194     def rmdir(self, name):
195         raise NotImplementedError()
196
197     def rename(self, name_old, name_new, src):
198         raise NotImplementedError()
199
200
201 class CollectionDirectoryBase(Directory):
202     """Represent an Arvados Collection as a directory.
203
204     This class is used for Subcollections, and is also the base class for
205     CollectionDirectory, which implements collection loading/saving on
206     Collection records.
207
208     Most operations act only the underlying Arvados `Collection` object.  The
209     `Collection` object signals via a notify callback to
210     `CollectionDirectoryBase.on_event` that an item was added, removed or
211     modified.  FUSE inodes and directory entries are created, deleted or
212     invalidated in response to these events.
213
214     """
215
216     def __init__(self, parent_inode, inodes, collection):
217         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
218         self.collection = collection
219
220     def new_entry(self, name, item, mtime):
221         name = sanitize_filename(name)
222         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
223             if item.fuse_entry.dead is not True:
224                 raise Exception("Can only reparent dead inode entry")
225             if item.fuse_entry.inode is None:
226                 raise Exception("Reparented entry must still have valid inode")
227             item.fuse_entry.dead = False
228             self._entries[name] = item.fuse_entry
229         elif isinstance(item, arvados.collection.RichCollectionBase):
230             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
231             self._entries[name].populate(mtime)
232         else:
233             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
234         item.fuse_entry = self._entries[name]
235
236     def on_event(self, event, collection, name, item):
237         if collection == self.collection:
238             _logger.debug("%s %s %s %s", event, collection, name, item)
239             with llfuse.lock:
240                 if event == arvados.collection.ADD:
241                     self.new_entry(name, item, self.mtime())
242                 elif event == arvados.collection.DEL:
243                     ent = self._entries[name]
244                     del self._entries[name]
245                     llfuse.invalidate_entry(self.inode, name)
246                     self.inodes.del_entry(ent)
247                 elif event == arvados.collection.MOD:
248                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
249                         llfuse.invalidate_inode(item.fuse_entry.inode)
250                     elif name in self._entries:
251                         llfuse.invalidate_inode(self._entries[name].inode)
252
253     def populate(self, mtime):
254         self._mtime = mtime
255         self.collection.subscribe(self.on_event)
256         for entry, item in self.collection.items():
257             self.new_entry(entry, item, self.mtime())
258
259     def writable(self):
260         return self.collection.writable()
261
262     @use_counter
263     def flush(self):
264         with llfuse.lock_released:
265             self.collection.root_collection().save()
266
267     @use_counter
268     @check_update
269     def create(self, name):
270         with llfuse.lock_released:
271             self.collection.open(name, "w").close()
272
273     @use_counter
274     @check_update
275     def mkdir(self, name):
276         with llfuse.lock_released:
277             self.collection.mkdirs(name)
278
279     @use_counter
280     @check_update
281     def unlink(self, name):
282         with llfuse.lock_released:
283             self.collection.remove(name)
284         self.flush()
285
286     @use_counter
287     @check_update
288     def rmdir(self, name):
289         with llfuse.lock_released:
290             self.collection.remove(name)
291         self.flush()
292
293     @use_counter
294     @check_update
295     def rename(self, name_old, name_new, src):
296         if not isinstance(src, CollectionDirectoryBase):
297             raise llfuse.FUSEError(errno.EPERM)
298
299         if name_new in self:
300             ent = src[name_old]
301             tgt = self[name_new]
302             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
303                 pass
304             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
305                 if len(tgt) > 0:
306                     raise llfuse.FUSEError(errno.ENOTEMPTY)
307             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
308                 raise llfuse.FUSEError(errno.ENOTDIR)
309             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
310                 raise llfuse.FUSEError(errno.EISDIR)
311
312         with llfuse.lock_released:
313             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
314         self.flush()
315         src.flush()
316
317
318 class CollectionDirectory(CollectionDirectoryBase):
319     """Represents the root of a directory tree representing a collection."""
320
321     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
322         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
323         self.api = api
324         self.num_retries = num_retries
325         self.collection_record_file = None
326         self.collection_record = None
327         if isinstance(collection_record, dict):
328             self.collection_locator = collection_record['uuid']
329             self._mtime = convertTime(collection_record.get('modified_at'))
330         else:
331             self.collection_locator = collection_record
332             self._mtime = 0
333         self._manifest_size = 0
334         if self.collection_locator:
335             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
336         self._updating_lock = threading.Lock()
337
338     def same(self, i):
339         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
340
341     def writable(self):
342         return self.collection.writable() if self.collection is not None else self._writable
343
344     # Used by arv-web.py to switch the contents of the CollectionDirectory
345     def change_collection(self, new_locator):
346         """Switch the contents of the CollectionDirectory.
347
348         Must be called with llfuse.lock held.
349         """
350
351         self.collection_locator = new_locator
352         self.collection_record = None
353         self.update()
354
355     def new_collection(self, new_collection_record, coll_reader):
356         if self.inode:
357             self.clear(force=True)
358
359         self.collection_record = new_collection_record
360
361         if self.collection_record:
362             self._mtime = convertTime(self.collection_record.get('modified_at'))
363             self.collection_locator = self.collection_record["uuid"]
364             if self.collection_record_file is not None:
365                 self.collection_record_file.update(self.collection_record)
366
367         self.collection = coll_reader
368         self.populate(self.mtime())
369
370     def uuid(self):
371         return self.collection_locator
372
373     @use_counter
374     def update(self, to_pdh=None):
375         try:
376             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
377                 return True
378
379             if self.collection_locator is None:
380                 self.fresh()
381                 return True
382
383             try:
384                 with llfuse.lock_released:
385                     self._updating_lock.acquire()
386                     if not self.stale():
387                         return
388
389                     _logger.debug("Updating %s", self.collection_locator)
390                     if self.collection:
391                         if self.collection.portable_data_hash() == to_pdh:
392                             _logger.debug("%s is fresh at pdh '%s'", self.collection_locator, to_pdh)
393                         else:
394                             self.collection.update()
395                     else:
396                         if uuid_pattern.match(self.collection_locator):
397                             coll_reader = arvados.collection.Collection(
398                                 self.collection_locator, self.api, self.api.keep,
399                                 num_retries=self.num_retries)
400                         else:
401                             coll_reader = arvados.collection.CollectionReader(
402                                 self.collection_locator, self.api, self.api.keep,
403                                 num_retries=self.num_retries)
404                         new_collection_record = coll_reader.api_response() or {}
405                         # If the Collection only exists in Keep, there will be no API
406                         # response.  Fill in the fields we need.
407                         if 'uuid' not in new_collection_record:
408                             new_collection_record['uuid'] = self.collection_locator
409                         if "portable_data_hash" not in new_collection_record:
410                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
411                         if 'manifest_text' not in new_collection_record:
412                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
413
414                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
415                             self.new_collection(new_collection_record, coll_reader)
416
417                         self._manifest_size = len(coll_reader.manifest_text())
418                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
419                 # end with llfuse.lock_released, re-acquire lock
420
421                 self.fresh()
422                 return True
423             finally:
424                 self._updating_lock.release()
425         except arvados.errors.NotFoundError:
426             _logger.exception("arv-mount %s: error", self.collection_locator)
427         except arvados.errors.ArgumentError as detail:
428             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
429             if self.collection_record is not None and "manifest_text" in self.collection_record:
430                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
431         except Exception:
432             _logger.exception("arv-mount %s: error", self.collection_locator)
433             if self.collection_record is not None and "manifest_text" in self.collection_record:
434                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
435         return False
436
437     @use_counter
438     @check_update
439     def __getitem__(self, item):
440         if item == '.arvados#collection':
441             if self.collection_record_file is None:
442                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
443                 self.inodes.add_entry(self.collection_record_file)
444             return self.collection_record_file
445         else:
446             return super(CollectionDirectory, self).__getitem__(item)
447
448     def __contains__(self, k):
449         if k == '.arvados#collection':
450             return True
451         else:
452             return super(CollectionDirectory, self).__contains__(k)
453
454     def invalidate(self):
455         self.collection_record = None
456         self.collection_record_file = None
457         super(CollectionDirectory, self).invalidate()
458
459     def persisted(self):
460         return (self.collection_locator is not None)
461
462     def objsize(self):
463         # This is an empirically-derived heuristic to estimate the memory used
464         # to store this collection's metadata.  Calculating the memory
465         # footprint directly would be more accurate, but also more complicated.
466         return self._manifest_size * 128
467
468
469 class MagicDirectory(Directory):
470     """A special directory that logically contains the set of all extant keep locators.
471
472     When a file is referenced by lookup(), it is tested to see if it is a valid
473     keep locator to a manifest, and if so, loads the manifest contents as a
474     subdirectory of this directory with the locator as the directory name.
475     Since querying a list of all extant keep locators is impractical, only
476     collections that have already been accessed are visible to readdir().
477
478     """
479
480     README_TEXT = """
481 This directory provides access to Arvados collections as subdirectories listed
482 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
483 the form '1234567890abcdefghijklmnopqrstuv+123').
484
485 Note that this directory will appear empty until you attempt to access a
486 specific collection subdirectory (such as trying to 'cd' into it), at which
487 point the collection will actually be looked up on the server and the directory
488 will appear if it exists.
489 """.lstrip()
490
491     def __init__(self, parent_inode, inodes, api, num_retries):
492         super(MagicDirectory, self).__init__(parent_inode, inodes)
493         self.api = api
494         self.num_retries = num_retries
495
496     def __setattr__(self, name, value):
497         super(MagicDirectory, self).__setattr__(name, value)
498         # When we're assigned an inode, add a README.
499         if ((name == 'inode') and (self.inode is not None) and
500               (not self._entries)):
501             self._entries['README'] = self.inodes.add_entry(
502                 StringFile(self.inode, self.README_TEXT, time.time()))
503             # If we're the root directory, add an identical by_id subdirectory.
504             if self.inode == llfuse.ROOT_INODE:
505                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
506                         self.inode, self.inodes, self.api, self.num_retries))
507
508     def __contains__(self, k):
509         if k in self._entries:
510             return True
511
512         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
513             return False
514
515         try:
516             e = self.inodes.add_entry(CollectionDirectory(
517                     self.inode, self.inodes, self.api, self.num_retries, k))
518
519             if e.update():
520                 self._entries[k] = e
521                 return True
522             else:
523                 return False
524         except Exception as e:
525             _logger.debug('arv-mount exception keep %s', e)
526             return False
527
528     def __getitem__(self, item):
529         if item in self:
530             return self._entries[item]
531         else:
532             raise KeyError("No collection with id " + item)
533
534     def clear(self, force=False):
535         pass
536
537
538 class RecursiveInvalidateDirectory(Directory):
539     def invalidate(self):
540         try:
541             super(RecursiveInvalidateDirectory, self).invalidate()
542             for a in self._entries:
543                 self._entries[a].invalidate()
544         except Exception:
545             _logger.exception()
546
547
548 class TagsDirectory(RecursiveInvalidateDirectory):
549     """A special directory that contains as subdirectories all tags visible to the user."""
550
551     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
552         super(TagsDirectory, self).__init__(parent_inode, inodes)
553         self.api = api
554         self.num_retries = num_retries
555         self._poll = True
556         self._poll_time = poll_time
557
558     @use_counter
559     def update(self):
560         with llfuse.lock_released:
561             tags = self.api.links().list(
562                 filters=[['link_class', '=', 'tag']],
563                 select=['name'], distinct=True
564                 ).execute(num_retries=self.num_retries)
565         if "items" in tags:
566             self.merge(tags['items'],
567                        lambda i: i['name'],
568                        lambda a, i: a.tag == i['name'],
569                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
570
571
572 class TagDirectory(Directory):
573     """A special directory that contains as subdirectories all collections visible
574     to the user that are tagged with a particular tag.
575     """
576
577     def __init__(self, parent_inode, inodes, api, num_retries, tag,
578                  poll=False, poll_time=60):
579         super(TagDirectory, self).__init__(parent_inode, inodes)
580         self.api = api
581         self.num_retries = num_retries
582         self.tag = tag
583         self._poll = poll
584         self._poll_time = poll_time
585
586     @use_counter
587     def update(self):
588         with llfuse.lock_released:
589             taggedcollections = self.api.links().list(
590                 filters=[['link_class', '=', 'tag'],
591                          ['name', '=', self.tag],
592                          ['head_uuid', 'is_a', 'arvados#collection']],
593                 select=['head_uuid']
594                 ).execute(num_retries=self.num_retries)
595         self.merge(taggedcollections['items'],
596                    lambda i: i['head_uuid'],
597                    lambda a, i: a.collection_locator == i['head_uuid'],
598                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
599
600
601 class ProjectDirectory(Directory):
602     """A special directory that contains the contents of a project."""
603
604     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
605                  poll=False, poll_time=60):
606         super(ProjectDirectory, self).__init__(parent_inode, inodes)
607         self.api = api
608         self.num_retries = num_retries
609         self.project_object = project_object
610         self.project_object_file = None
611         self.project_uuid = project_object['uuid']
612         self._poll = poll
613         self._poll_time = poll_time
614         self._updating_lock = threading.Lock()
615         self._current_user = None
616
617     def createDirectory(self, i):
618         if collection_uuid_pattern.match(i['uuid']):
619             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
620         elif group_uuid_pattern.match(i['uuid']):
621             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
622         elif link_uuid_pattern.match(i['uuid']):
623             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
624                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
625             else:
626                 return None
627         elif uuid_pattern.match(i['uuid']):
628             return ObjectFile(self.parent_inode, i)
629         else:
630             return None
631
632     def uuid(self):
633         return self.project_uuid
634
635     @use_counter
636     def update(self):
637         if self.project_object_file == None:
638             self.project_object_file = ObjectFile(self.inode, self.project_object)
639             self.inodes.add_entry(self.project_object_file)
640
641         def namefn(i):
642             if 'name' in i:
643                 if i['name'] is None or len(i['name']) == 0:
644                     return None
645                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
646                     # collection or subproject
647                     return i['name']
648                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
649                     # name link
650                     return i['name']
651                 elif 'kind' in i and i['kind'].startswith('arvados#'):
652                     # something else
653                     return "{}.{}".format(i['name'], i['kind'][8:])
654             else:
655                 return None
656
657         def samefn(a, i):
658             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
659                 return a.uuid() == i['uuid']
660             elif isinstance(a, ObjectFile):
661                 return a.uuid() == i['uuid'] and not a.stale()
662             return False
663
664         try:
665             with llfuse.lock_released:
666                 self._updating_lock.acquire()
667                 if not self.stale():
668                     return
669
670                 if group_uuid_pattern.match(self.project_uuid):
671                     self.project_object = self.api.groups().get(
672                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
673                 elif user_uuid_pattern.match(self.project_uuid):
674                     self.project_object = self.api.users().get(
675                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
676
677                 contents = arvados.util.list_all(self.api.groups().contents,
678                                                  self.num_retries, uuid=self.project_uuid)
679
680             # end with llfuse.lock_released, re-acquire lock
681
682             self.merge(contents,
683                        namefn,
684                        samefn,
685                        self.createDirectory)
686         finally:
687             self._updating_lock.release()
688
689     @use_counter
690     @check_update
691     def __getitem__(self, item):
692         if item == '.arvados#project':
693             return self.project_object_file
694         else:
695             return super(ProjectDirectory, self).__getitem__(item)
696
697     def __contains__(self, k):
698         if k == '.arvados#project':
699             return True
700         else:
701             return super(ProjectDirectory, self).__contains__(k)
702
703     @use_counter
704     @check_update
705     def writable(self):
706         with llfuse.lock_released:
707             if not self._current_user:
708                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
709             return self._current_user["uuid"] in self.project_object["writable_by"]
710
711     def persisted(self):
712         return True
713
714     @use_counter
715     @check_update
716     def mkdir(self, name):
717         try:
718             with llfuse.lock_released:
719                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
720                                                     "name": name,
721                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
722             self.invalidate()
723         except apiclient_errors.Error as error:
724             _logger.error(error)
725             raise llfuse.FUSEError(errno.EEXIST)
726
727     @use_counter
728     @check_update
729     def rmdir(self, name):
730         if name not in self:
731             raise llfuse.FUSEError(errno.ENOENT)
732         if not isinstance(self[name], CollectionDirectory):
733             raise llfuse.FUSEError(errno.EPERM)
734         if len(self[name]) > 0:
735             raise llfuse.FUSEError(errno.ENOTEMPTY)
736         with llfuse.lock_released:
737             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
738         self.invalidate()
739
740     @use_counter
741     @check_update
742     def rename(self, name_old, name_new, src):
743         if not isinstance(src, ProjectDirectory):
744             raise llfuse.FUSEError(errno.EPERM)
745
746         ent = src[name_old]
747
748         if not isinstance(ent, CollectionDirectory):
749             raise llfuse.FUSEError(errno.EPERM)
750
751         if name_new in self:
752             # POSIX semantics for replacing one directory with another is
753             # tricky (the target directory must be empty, the operation must be
754             # atomic which isn't possible with the Arvados API as of this
755             # writing) so don't support that.
756             raise llfuse.FUSEError(errno.EPERM)
757
758         self.api.collections().update(uuid=ent.uuid(),
759                                       body={"owner_uuid": self.uuid(),
760                                             "name": name_new}).execute(num_retries=self.num_retries)
761
762         # Acually move the entry from source directory to this directory.
763         del src._entries[name_old]
764         self._entries[name_new] = ent
765         llfuse.invalidate_entry(src.inode, name_old)
766
767
768 class SharedDirectory(Directory):
769     """A special directory that represents users or groups who have shared projects with me."""
770
771     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
772                  poll=False, poll_time=60):
773         super(SharedDirectory, self).__init__(parent_inode, inodes)
774         self.api = api
775         self.num_retries = num_retries
776         self.current_user = api.users().current().execute(num_retries=num_retries)
777         self._poll = True
778         self._poll_time = poll_time
779
780     @use_counter
781     def update(self):
782         with llfuse.lock_released:
783             all_projects = arvados.util.list_all(
784                 self.api.groups().list, self.num_retries,
785                 filters=[['group_class','=','project']])
786             objects = {}
787             for ob in all_projects:
788                 objects[ob['uuid']] = ob
789
790             roots = []
791             root_owners = {}
792             for ob in all_projects:
793                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
794                     roots.append(ob)
795                     root_owners[ob['owner_uuid']] = True
796
797             lusers = arvados.util.list_all(
798                 self.api.users().list, self.num_retries,
799                 filters=[['uuid','in', list(root_owners)]])
800             lgroups = arvados.util.list_all(
801                 self.api.groups().list, self.num_retries,
802                 filters=[['uuid','in', list(root_owners)]])
803
804             users = {}
805             groups = {}
806
807             for l in lusers:
808                 objects[l["uuid"]] = l
809             for l in lgroups:
810                 objects[l["uuid"]] = l
811
812             contents = {}
813             for r in root_owners:
814                 if r in objects:
815                     obr = objects[r]
816                     if obr.get("name"):
817                         contents[obr["name"]] = obr
818                     #elif obr.get("username"):
819                     #    contents[obr["username"]] = obr
820                     elif "first_name" in obr:
821                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
822
823
824             for r in roots:
825                 if r['owner_uuid'] not in objects:
826                     contents[r['name']] = r
827
828         # end with llfuse.lock_released, re-acquire lock
829
830         try:
831             self.merge(contents.items(),
832                        lambda i: i[0],
833                        lambda a, i: a.uuid() == i[1]['uuid'],
834                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
835         except Exception:
836             _logger.exception()