closes #6277
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8 import threading
9 from apiclient import errors as apiclient_errors
10 import errno
11
12 from fusefile import StringFile, ObjectFile, FuseArvadosFile
13 from fresh import FreshBase, convertTime, use_counter, check_update
14
15 import arvados.collection
16 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
17
18 _logger = logging.getLogger('arvados.arvados_fuse')
19
20
21 # Match any character which FUSE or Linux cannot accommodate as part
22 # of a filename. (If present in a collection filename, they will
23 # appear as underscores in the fuse mount.)
24 _disallowed_filename_characters = re.compile('[\x00/]')
25
26 # '.' and '..' are not reachable if API server is newer than #6277
27 def sanitize_filename(dirty):
28     """Replace disallowed filename characters with harmless "_"."""
29     if dirty is None:
30         return None
31     elif dirty == '':
32         return '_'
33     elif dirty == '.':
34         return '_'
35     elif dirty == '..':
36         return '__'
37     else:
38         return _disallowed_filename_characters.sub('_', dirty)
39
40
41 class Directory(FreshBase):
42     """Generic directory object, backed by a dict.
43
44     Consists of a set of entries with the key representing the filename
45     and the value referencing a File or Directory object.
46     """
47
48     def __init__(self, parent_inode, inodes):
49         """parent_inode is the integer inode number"""
50
51         super(Directory, self).__init__()
52
53         self.inode = None
54         if not isinstance(parent_inode, int):
55             raise Exception("parent_inode should be an int")
56         self.parent_inode = parent_inode
57         self.inodes = inodes
58         self._entries = {}
59         self._mtime = time.time()
60
61     #  Overriden by subclasses to implement logic to update the entries dict
62     #  when the directory is stale
63     @use_counter
64     def update(self):
65         pass
66
67     # Only used when computing the size of the disk footprint of the directory
68     # (stub)
69     def size(self):
70         return 0
71
72     def persisted(self):
73         return False
74
75     def checkupdate(self):
76         if self.stale():
77             try:
78                 self.update()
79             except apiclient.errors.HttpError as e:
80                 _logger.warn(e)
81
82     @use_counter
83     @check_update
84     def __getitem__(self, item):
85         return self._entries[item]
86
87     @use_counter
88     @check_update
89     def items(self):
90         return list(self._entries.items())
91
92     @use_counter
93     @check_update
94     def __contains__(self, k):
95         return k in self._entries
96
97     @use_counter
98     @check_update
99     def __len__(self):
100         return len(self._entries)
101
102     def fresh(self):
103         self.inodes.touch(self)
104         super(Directory, self).fresh()
105
106     def merge(self, items, fn, same, new_entry):
107         """Helper method for updating the contents of the directory.
108
109         Takes a list describing the new contents of the directory, reuse
110         entries that are the same in both the old and new lists, create new
111         entries, and delete old entries missing from the new list.
112
113         :items: iterable with new directory contents
114
115         :fn: function to take an entry in 'items' and return the desired file or
116         directory name, or None if this entry should be skipped
117
118         :same: function to compare an existing entry (a File or Directory
119         object) with an entry in the items list to determine whether to keep
120         the existing entry.
121
122         :new_entry: function to create a new directory entry (File or Directory
123         object) from an entry in the items list.
124
125         """
126
127         oldentries = self._entries
128         self._entries = {}
129         changed = False
130         for i in items:
131             name = sanitize_filename(fn(i))
132             if name:
133                 if name in oldentries and same(oldentries[name], i):
134                     # move existing directory entry over
135                     self._entries[name] = oldentries[name]
136                     del oldentries[name]
137                 else:
138                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
139                     # create new directory entry
140                     ent = new_entry(i)
141                     if ent is not None:
142                         self._entries[name] = self.inodes.add_entry(ent)
143                         changed = True
144
145         # delete any other directory entries that were not in found in 'items'
146         for i in oldentries:
147             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
148             self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
149             self.inodes.del_entry(oldentries[i])
150             changed = True
151
152         if changed:
153             self.inodes.invalidate_inode(self.inode)
154             self._mtime = time.time()
155
156         self.fresh()
157
158     def clear(self, force=False):
159         """Delete all entries"""
160
161         if not self.in_use() or force:
162             oldentries = self._entries
163             self._entries = {}
164             for n in oldentries:
165                 if not oldentries[n].clear(force):
166                     self._entries = oldentries
167                     return False
168             for n in oldentries:
169                 self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
170                 self.inodes.del_entry(oldentries[n])
171             self.inodes.invalidate_inode(self.inode)
172             self.invalidate()
173             return True
174         else:
175             return False
176
177     def mtime(self):
178         return self._mtime
179
180     def writable(self):
181         return False
182
183     def flush(self):
184         pass
185
186     def create(self, name):
187         raise NotImplementedError()
188
189     def mkdir(self, name):
190         raise NotImplementedError()
191
192     def unlink(self, name):
193         raise NotImplementedError()
194
195     def rmdir(self, name):
196         raise NotImplementedError()
197
198     def rename(self, name_old, name_new, src):
199         raise NotImplementedError()
200
201
202 class CollectionDirectoryBase(Directory):
203     """Represent an Arvados Collection as a directory.
204
205     This class is used for Subcollections, and is also the base class for
206     CollectionDirectory, which implements collection loading/saving on
207     Collection records.
208
209     Most operations act only the underlying Arvados `Collection` object.  The
210     `Collection` object signals via a notify callback to
211     `CollectionDirectoryBase.on_event` that an item was added, removed or
212     modified.  FUSE inodes and directory entries are created, deleted or
213     invalidated in response to these events.
214
215     """
216
217     def __init__(self, parent_inode, inodes, collection):
218         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
219         self.collection = collection
220
221     def new_entry(self, name, item, mtime):
222         name = sanitize_filename(name)
223         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
224             if item.fuse_entry.dead is not True:
225                 raise Exception("Can only reparent dead inode entry")
226             if item.fuse_entry.inode is None:
227                 raise Exception("Reparented entry must still have valid inode")
228             item.fuse_entry.dead = False
229             self._entries[name] = item.fuse_entry
230         elif isinstance(item, arvados.collection.RichCollectionBase):
231             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
232             self._entries[name].populate(mtime)
233         else:
234             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
235         item.fuse_entry = self._entries[name]
236
237     def on_event(self, event, collection, name, item):
238         if collection == self.collection:
239             name = sanitize_filename(name)
240             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
241             with llfuse.lock:
242                 if event == arvados.collection.ADD:
243                     self.new_entry(name, item, self.mtime())
244                 elif event == arvados.collection.DEL:
245                     ent = self._entries[name]
246                     del self._entries[name]
247                     self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
248                     self.inodes.del_entry(ent)
249                 elif event == arvados.collection.MOD:
250                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
251                         self.inodes.invalidate_inode(item.fuse_entry.inode)
252                     elif name in self._entries:
253                         self.inodes.invalidate_inode(self._entries[name].inode)
254
255     def populate(self, mtime):
256         self._mtime = mtime
257         self.collection.subscribe(self.on_event)
258         for entry, item in self.collection.items():
259             self.new_entry(entry, item, self.mtime())
260
261     def writable(self):
262         return self.collection.writable()
263
264     @use_counter
265     def flush(self):
266         with llfuse.lock_released:
267             self.collection.root_collection().save()
268
269     @use_counter
270     @check_update
271     def create(self, name):
272         with llfuse.lock_released:
273             self.collection.open(name, "w").close()
274
275     @use_counter
276     @check_update
277     def mkdir(self, name):
278         with llfuse.lock_released:
279             self.collection.mkdirs(name)
280
281     @use_counter
282     @check_update
283     def unlink(self, name):
284         with llfuse.lock_released:
285             self.collection.remove(name)
286         self.flush()
287
288     @use_counter
289     @check_update
290     def rmdir(self, name):
291         with llfuse.lock_released:
292             self.collection.remove(name)
293         self.flush()
294
295     @use_counter
296     @check_update
297     def rename(self, name_old, name_new, src):
298         if not isinstance(src, CollectionDirectoryBase):
299             raise llfuse.FUSEError(errno.EPERM)
300
301         if name_new in self:
302             ent = src[name_old]
303             tgt = self[name_new]
304             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
305                 pass
306             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
307                 if len(tgt) > 0:
308                     raise llfuse.FUSEError(errno.ENOTEMPTY)
309             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
310                 raise llfuse.FUSEError(errno.ENOTDIR)
311             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
312                 raise llfuse.FUSEError(errno.EISDIR)
313
314         with llfuse.lock_released:
315             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
316         self.flush()
317         src.flush()
318
319
320 class CollectionDirectory(CollectionDirectoryBase):
321     """Represents the root of a directory tree representing a collection."""
322
323     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
324         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
325         self.api = api
326         self.num_retries = num_retries
327         self.collection_record_file = None
328         self.collection_record = None
329         if isinstance(collection_record, dict):
330             self.collection_locator = collection_record['uuid']
331             self._mtime = convertTime(collection_record.get('modified_at'))
332         else:
333             self.collection_locator = collection_record
334             self._mtime = 0
335         self._manifest_size = 0
336         if self.collection_locator:
337             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
338         self._updating_lock = threading.Lock()
339
340     def same(self, i):
341         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
342
343     def writable(self):
344         return self.collection.writable() if self.collection is not None else self._writable
345
346     # Used by arv-web.py to switch the contents of the CollectionDirectory
347     def change_collection(self, new_locator):
348         """Switch the contents of the CollectionDirectory.
349
350         Must be called with llfuse.lock held.
351         """
352
353         self.collection_locator = new_locator
354         self.collection_record = None
355         self.update()
356
357     def new_collection(self, new_collection_record, coll_reader):
358         if self.inode:
359             self.clear(force=True)
360
361         self.collection_record = new_collection_record
362
363         if self.collection_record:
364             self._mtime = convertTime(self.collection_record.get('modified_at'))
365             self.collection_locator = self.collection_record["uuid"]
366             if self.collection_record_file is not None:
367                 self.collection_record_file.update(self.collection_record)
368
369         self.collection = coll_reader
370         self.populate(self.mtime())
371
372     def uuid(self):
373         return self.collection_locator
374
375     @use_counter
376     def update(self, to_record_version=None):
377         try:
378             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
379                 return True
380
381             if self.collection_locator is None:
382                 self.fresh()
383                 return True
384
385             try:
386                 with llfuse.lock_released:
387                     self._updating_lock.acquire()
388                     if not self.stale():
389                         return
390
391                     _logger.debug("Updating %s", to_record_version)
392                     if self.collection is not None:
393                         if self.collection.known_past_version(to_record_version):
394                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
395                         else:
396                             self.collection.update()
397                     else:
398                         if uuid_pattern.match(self.collection_locator):
399                             coll_reader = arvados.collection.Collection(
400                                 self.collection_locator, self.api, self.api.keep,
401                                 num_retries=self.num_retries)
402                         else:
403                             coll_reader = arvados.collection.CollectionReader(
404                                 self.collection_locator, self.api, self.api.keep,
405                                 num_retries=self.num_retries)
406                         new_collection_record = coll_reader.api_response() or {}
407                         # If the Collection only exists in Keep, there will be no API
408                         # response.  Fill in the fields we need.
409                         if 'uuid' not in new_collection_record:
410                             new_collection_record['uuid'] = self.collection_locator
411                         if "portable_data_hash" not in new_collection_record:
412                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
413                         if 'manifest_text' not in new_collection_record:
414                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
415
416                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
417                             self.new_collection(new_collection_record, coll_reader)
418
419                         self._manifest_size = len(coll_reader.manifest_text())
420                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
421                 # end with llfuse.lock_released, re-acquire lock
422
423                 self.fresh()
424                 return True
425             finally:
426                 self._updating_lock.release()
427         except arvados.errors.NotFoundError as e:
428             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
429         except arvados.errors.ArgumentError as detail:
430             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
431             if self.collection_record is not None and "manifest_text" in self.collection_record:
432                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
433         except Exception:
434             _logger.exception("arv-mount %s: error", self.collection_locator)
435             if self.collection_record is not None and "manifest_text" in self.collection_record:
436                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
437         return False
438
439     @use_counter
440     @check_update
441     def __getitem__(self, item):
442         if item == '.arvados#collection':
443             if self.collection_record_file is None:
444                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
445                 self.inodes.add_entry(self.collection_record_file)
446             return self.collection_record_file
447         else:
448             return super(CollectionDirectory, self).__getitem__(item)
449
450     def __contains__(self, k):
451         if k == '.arvados#collection':
452             return True
453         else:
454             return super(CollectionDirectory, self).__contains__(k)
455
456     def invalidate(self):
457         self.collection_record = None
458         self.collection_record_file = None
459         super(CollectionDirectory, self).invalidate()
460
461     def persisted(self):
462         return (self.collection_locator is not None)
463
464     def objsize(self):
465         # This is an empirically-derived heuristic to estimate the memory used
466         # to store this collection's metadata.  Calculating the memory
467         # footprint directly would be more accurate, but also more complicated.
468         return self._manifest_size * 128
469
470     def finalize(self):
471         if self.collection is not None:
472             if self.writable():
473                 self.collection.save()
474             self.collection.stop_threads()
475
476
477 class MagicDirectory(Directory):
478     """A special directory that logically contains the set of all extant keep locators.
479
480     When a file is referenced by lookup(), it is tested to see if it is a valid
481     keep locator to a manifest, and if so, loads the manifest contents as a
482     subdirectory of this directory with the locator as the directory name.
483     Since querying a list of all extant keep locators is impractical, only
484     collections that have already been accessed are visible to readdir().
485
486     """
487
488     README_TEXT = """
489 This directory provides access to Arvados collections as subdirectories listed
490 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
491 the form '1234567890abcdefghijklmnopqrstuv+123').
492
493 Note that this directory will appear empty until you attempt to access a
494 specific collection subdirectory (such as trying to 'cd' into it), at which
495 point the collection will actually be looked up on the server and the directory
496 will appear if it exists.
497 """.lstrip()
498
499     def __init__(self, parent_inode, inodes, api, num_retries):
500         super(MagicDirectory, self).__init__(parent_inode, inodes)
501         self.api = api
502         self.num_retries = num_retries
503
504     def __setattr__(self, name, value):
505         super(MagicDirectory, self).__setattr__(name, value)
506         # When we're assigned an inode, add a README.
507         if ((name == 'inode') and (self.inode is not None) and
508               (not self._entries)):
509             self._entries['README'] = self.inodes.add_entry(
510                 StringFile(self.inode, self.README_TEXT, time.time()))
511             # If we're the root directory, add an identical by_id subdirectory.
512             if self.inode == llfuse.ROOT_INODE:
513                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
514                         self.inode, self.inodes, self.api, self.num_retries))
515
516     def __contains__(self, k):
517         if k in self._entries:
518             return True
519
520         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
521             return False
522
523         try:
524             e = self.inodes.add_entry(CollectionDirectory(
525                     self.inode, self.inodes, self.api, self.num_retries, k))
526
527             if e.update():
528                 if k not in self._entries:
529                     self._entries[k] = e
530                 else:
531                     self.inodes.del_entry(e)
532                 return True
533             else:
534                 self.inodes.del_entry(e)
535                 return False
536         except Exception as e:
537             _logger.debug('arv-mount exception keep %s', e)
538             self.inodes.del_entry(e)
539             return False
540
541     def __getitem__(self, item):
542         if item in self:
543             return self._entries[item]
544         else:
545             raise KeyError("No collection with id " + item)
546
547     def clear(self, force=False):
548         pass
549
550
551 class RecursiveInvalidateDirectory(Directory):
552     def invalidate(self):
553         try:
554             super(RecursiveInvalidateDirectory, self).invalidate()
555             for a in self._entries:
556                 self._entries[a].invalidate()
557         except Exception:
558             _logger.exception()
559
560
561 class TagsDirectory(RecursiveInvalidateDirectory):
562     """A special directory that contains as subdirectories all tags visible to the user."""
563
564     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
565         super(TagsDirectory, self).__init__(parent_inode, inodes)
566         self.api = api
567         self.num_retries = num_retries
568         self._poll = True
569         self._poll_time = poll_time
570
571     @use_counter
572     def update(self):
573         with llfuse.lock_released:
574             tags = self.api.links().list(
575                 filters=[['link_class', '=', 'tag']],
576                 select=['name'], distinct=True
577                 ).execute(num_retries=self.num_retries)
578         if "items" in tags:
579             self.merge(tags['items'],
580                        lambda i: i['name'],
581                        lambda a, i: a.tag == i['name'],
582                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
583
584
585 class TagDirectory(Directory):
586     """A special directory that contains as subdirectories all collections visible
587     to the user that are tagged with a particular tag.
588     """
589
590     def __init__(self, parent_inode, inodes, api, num_retries, tag,
591                  poll=False, poll_time=60):
592         super(TagDirectory, self).__init__(parent_inode, inodes)
593         self.api = api
594         self.num_retries = num_retries
595         self.tag = tag
596         self._poll = poll
597         self._poll_time = poll_time
598
599     @use_counter
600     def update(self):
601         with llfuse.lock_released:
602             taggedcollections = self.api.links().list(
603                 filters=[['link_class', '=', 'tag'],
604                          ['name', '=', self.tag],
605                          ['head_uuid', 'is_a', 'arvados#collection']],
606                 select=['head_uuid']
607                 ).execute(num_retries=self.num_retries)
608         self.merge(taggedcollections['items'],
609                    lambda i: i['head_uuid'],
610                    lambda a, i: a.collection_locator == i['head_uuid'],
611                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
612
613
614 class ProjectDirectory(Directory):
615     """A special directory that contains the contents of a project."""
616
617     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
618                  poll=False, poll_time=60):
619         super(ProjectDirectory, self).__init__(parent_inode, inodes)
620         self.api = api
621         self.num_retries = num_retries
622         self.project_object = project_object
623         self.project_object_file = None
624         self.project_uuid = project_object['uuid']
625         self._poll = poll
626         self._poll_time = poll_time
627         self._updating_lock = threading.Lock()
628         self._current_user = None
629
630     def createDirectory(self, i):
631         if collection_uuid_pattern.match(i['uuid']):
632             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
633         elif group_uuid_pattern.match(i['uuid']):
634             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
635         elif link_uuid_pattern.match(i['uuid']):
636             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
637                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
638             else:
639                 return None
640         elif uuid_pattern.match(i['uuid']):
641             return ObjectFile(self.parent_inode, i)
642         else:
643             return None
644
645     def uuid(self):
646         return self.project_uuid
647
648     @use_counter
649     def update(self):
650         if self.project_object_file == None:
651             self.project_object_file = ObjectFile(self.inode, self.project_object)
652             self.inodes.add_entry(self.project_object_file)
653
654         def namefn(i):
655             if 'name' in i:
656                 if i['name'] is None or len(i['name']) == 0:
657                     return None
658                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
659                     # collection or subproject
660                     return i['name']
661                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
662                     # name link
663                     return i['name']
664                 elif 'kind' in i and i['kind'].startswith('arvados#'):
665                     # something else
666                     return "{}.{}".format(i['name'], i['kind'][8:])
667             else:
668                 return None
669
670         def samefn(a, i):
671             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
672                 return a.uuid() == i['uuid']
673             elif isinstance(a, ObjectFile):
674                 return a.uuid() == i['uuid'] and not a.stale()
675             return False
676
677         try:
678             with llfuse.lock_released:
679                 self._updating_lock.acquire()
680                 if not self.stale():
681                     return
682
683                 if group_uuid_pattern.match(self.project_uuid):
684                     self.project_object = self.api.groups().get(
685                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
686                 elif user_uuid_pattern.match(self.project_uuid):
687                     self.project_object = self.api.users().get(
688                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
689
690                 contents = arvados.util.list_all(self.api.groups().contents,
691                                                  self.num_retries, uuid=self.project_uuid)
692
693             # end with llfuse.lock_released, re-acquire lock
694
695             self.merge(contents,
696                        namefn,
697                        samefn,
698                        self.createDirectory)
699         finally:
700             self._updating_lock.release()
701
702     @use_counter
703     @check_update
704     def __getitem__(self, item):
705         if item == '.arvados#project':
706             return self.project_object_file
707         else:
708             return super(ProjectDirectory, self).__getitem__(item)
709
710     def __contains__(self, k):
711         if k == '.arvados#project':
712             return True
713         else:
714             return super(ProjectDirectory, self).__contains__(k)
715
716     @use_counter
717     @check_update
718     def writable(self):
719         with llfuse.lock_released:
720             if not self._current_user:
721                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
722             return self._current_user["uuid"] in self.project_object["writable_by"]
723
724     def persisted(self):
725         return True
726
727     @use_counter
728     @check_update
729     def mkdir(self, name):
730         try:
731             with llfuse.lock_released:
732                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
733                                                     "name": name,
734                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
735             self.invalidate()
736         except apiclient_errors.Error as error:
737             _logger.error(error)
738             raise llfuse.FUSEError(errno.EEXIST)
739
740     @use_counter
741     @check_update
742     def rmdir(self, name):
743         if name not in self:
744             raise llfuse.FUSEError(errno.ENOENT)
745         if not isinstance(self[name], CollectionDirectory):
746             raise llfuse.FUSEError(errno.EPERM)
747         if len(self[name]) > 0:
748             raise llfuse.FUSEError(errno.ENOTEMPTY)
749         with llfuse.lock_released:
750             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
751         self.invalidate()
752
753     @use_counter
754     @check_update
755     def rename(self, name_old, name_new, src):
756         if not isinstance(src, ProjectDirectory):
757             raise llfuse.FUSEError(errno.EPERM)
758
759         ent = src[name_old]
760
761         if not isinstance(ent, CollectionDirectory):
762             raise llfuse.FUSEError(errno.EPERM)
763
764         if name_new in self:
765             # POSIX semantics for replacing one directory with another is
766             # tricky (the target directory must be empty, the operation must be
767             # atomic which isn't possible with the Arvados API as of this
768             # writing) so don't support that.
769             raise llfuse.FUSEError(errno.EPERM)
770
771         self.api.collections().update(uuid=ent.uuid(),
772                                       body={"owner_uuid": self.uuid(),
773                                             "name": name_new}).execute(num_retries=self.num_retries)
774
775         # Acually move the entry from source directory to this directory.
776         del src._entries[name_old]
777         self._entries[name_new] = ent
778         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
779
780
781 class SharedDirectory(Directory):
782     """A special directory that represents users or groups who have shared projects with me."""
783
784     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
785                  poll=False, poll_time=60):
786         super(SharedDirectory, self).__init__(parent_inode, inodes)
787         self.api = api
788         self.num_retries = num_retries
789         self.current_user = api.users().current().execute(num_retries=num_retries)
790         self._poll = True
791         self._poll_time = poll_time
792
793     @use_counter
794     def update(self):
795         with llfuse.lock_released:
796             all_projects = arvados.util.list_all(
797                 self.api.groups().list, self.num_retries,
798                 filters=[['group_class','=','project']])
799             objects = {}
800             for ob in all_projects:
801                 objects[ob['uuid']] = ob
802
803             roots = []
804             root_owners = {}
805             for ob in all_projects:
806                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
807                     roots.append(ob)
808                     root_owners[ob['owner_uuid']] = True
809
810             lusers = arvados.util.list_all(
811                 self.api.users().list, self.num_retries,
812                 filters=[['uuid','in', list(root_owners)]])
813             lgroups = arvados.util.list_all(
814                 self.api.groups().list, self.num_retries,
815                 filters=[['uuid','in', list(root_owners)]])
816
817             users = {}
818             groups = {}
819
820             for l in lusers:
821                 objects[l["uuid"]] = l
822             for l in lgroups:
823                 objects[l["uuid"]] = l
824
825             contents = {}
826             for r in root_owners:
827                 if r in objects:
828                     obr = objects[r]
829                     if obr.get("name"):
830                         contents[obr["name"]] = obr
831                     #elif obr.get("username"):
832                     #    contents[obr["username"]] = obr
833                     elif "first_name" in obr:
834                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
835
836
837             for r in roots:
838                 if r['owner_uuid'] not in objects:
839                     contents[r['name']] = r
840
841         # end with llfuse.lock_released, re-acquire lock
842
843         try:
844             self.merge(contents.items(),
845                        lambda i: i[0],
846                        lambda a, i: a.uuid() == i[1]['uuid'],
847                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
848         except Exception:
849             _logger.exception()