8285: Add want_event_subscribe flag to subclasses of fusedir.Directory,
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8 import threading
9 from apiclient import errors as apiclient_errors
10 import errno
11 import time
12
13 from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
14 from fresh import FreshBase, convertTime, use_counter, check_update
15
16 import arvados.collection
17 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
18
19 _logger = logging.getLogger('arvados.arvados_fuse')
20
21
22 # Match any character which FUSE or Linux cannot accommodate as part
23 # of a filename. (If present in a collection filename, they will
24 # appear as underscores in the fuse mount.)
25 _disallowed_filename_characters = re.compile('[\x00/]')
26
27 # '.' and '..' are not reachable if API server is newer than #6277
28 def sanitize_filename(dirty):
29     """Replace disallowed filename characters with harmless "_"."""
30     if dirty is None:
31         return None
32     elif dirty == '':
33         return '_'
34     elif dirty == '.':
35         return '_'
36     elif dirty == '..':
37         return '__'
38     else:
39         return _disallowed_filename_characters.sub('_', dirty)
40
41
42 class Directory(FreshBase):
43     """Generic directory object, backed by a dict.
44
45     Consists of a set of entries with the key representing the filename
46     and the value referencing a File or Directory object.
47     """
48
49     def __init__(self, parent_inode, inodes):
50         """parent_inode is the integer inode number"""
51
52         super(Directory, self).__init__()
53
54         self.inode = None
55         if not isinstance(parent_inode, int):
56             raise Exception("parent_inode should be an int")
57         self.parent_inode = parent_inode
58         self.inodes = inodes
59         self._entries = {}
60         self._mtime = time.time()
61
62     #  Overriden by subclasses to implement logic to update the entries dict
63     #  when the directory is stale
64     @use_counter
65     def update(self):
66         pass
67
68     # Only used when computing the size of the disk footprint of the directory
69     # (stub)
70     def size(self):
71         return 0
72
73     def persisted(self):
74         return False
75
76     def checkupdate(self):
77         if self.stale():
78             try:
79                 self.update()
80             except apiclient.errors.HttpError as e:
81                 _logger.warn(e)
82
83     @use_counter
84     @check_update
85     def __getitem__(self, item):
86         return self._entries[item]
87
88     @use_counter
89     @check_update
90     def items(self):
91         return list(self._entries.items())
92
93     @use_counter
94     @check_update
95     def __contains__(self, k):
96         return k in self._entries
97
98     @use_counter
99     @check_update
100     def __len__(self):
101         return len(self._entries)
102
103     def fresh(self):
104         self.inodes.touch(self)
105         super(Directory, self).fresh()
106
107     def merge(self, items, fn, same, new_entry):
108         """Helper method for updating the contents of the directory.
109
110         Takes a list describing the new contents of the directory, reuse
111         entries that are the same in both the old and new lists, create new
112         entries, and delete old entries missing from the new list.
113
114         :items: iterable with new directory contents
115
116         :fn: function to take an entry in 'items' and return the desired file or
117         directory name, or None if this entry should be skipped
118
119         :same: function to compare an existing entry (a File or Directory
120         object) with an entry in the items list to determine whether to keep
121         the existing entry.
122
123         :new_entry: function to create a new directory entry (File or Directory
124         object) from an entry in the items list.
125
126         """
127
128         oldentries = self._entries
129         self._entries = {}
130         changed = False
131         for i in items:
132             name = sanitize_filename(fn(i))
133             if name:
134                 if name in oldentries and same(oldentries[name], i):
135                     # move existing directory entry over
136                     self._entries[name] = oldentries[name]
137                     del oldentries[name]
138                 else:
139                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
140                     # create new directory entry
141                     ent = new_entry(i)
142                     if ent is not None:
143                         self._entries[name] = self.inodes.add_entry(ent)
144                         changed = True
145
146         # delete any other directory entries that were not in found in 'items'
147         for i in oldentries:
148             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
149             self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
150             self.inodes.del_entry(oldentries[i])
151             changed = True
152
153         if changed:
154             self.inodes.invalidate_inode(self.inode)
155             self._mtime = time.time()
156
157         self.fresh()
158
159     def clear(self, force=False):
160         """Delete all entries"""
161
162         if not self.in_use() or force:
163             oldentries = self._entries
164             self._entries = {}
165             for n in oldentries:
166                 if not oldentries[n].clear(force):
167                     self._entries = oldentries
168                     return False
169             for n in oldentries:
170                 self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
171                 self.inodes.del_entry(oldentries[n])
172             self.inodes.invalidate_inode(self.inode)
173             self.invalidate()
174             return True
175         else:
176             return False
177
178     def mtime(self):
179         return self._mtime
180
181     def writable(self):
182         return False
183
184     def flush(self):
185         pass
186
187     def want_event_subscribe(self):
188         raise NotImplementedError()
189
190     def create(self, name):
191         raise NotImplementedError()
192
193     def mkdir(self, name):
194         raise NotImplementedError()
195
196     def unlink(self, name):
197         raise NotImplementedError()
198
199     def rmdir(self, name):
200         raise NotImplementedError()
201
202     def rename(self, name_old, name_new, src):
203         raise NotImplementedError()
204
205
206 class CollectionDirectoryBase(Directory):
207     """Represent an Arvados Collection as a directory.
208
209     This class is used for Subcollections, and is also the base class for
210     CollectionDirectory, which implements collection loading/saving on
211     Collection records.
212
213     Most operations act only the underlying Arvados `Collection` object.  The
214     `Collection` object signals via a notify callback to
215     `CollectionDirectoryBase.on_event` that an item was added, removed or
216     modified.  FUSE inodes and directory entries are created, deleted or
217     invalidated in response to these events.
218
219     """
220
221     def __init__(self, parent_inode, inodes, collection):
222         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
223         self.collection = collection
224
225     def new_entry(self, name, item, mtime):
226         name = sanitize_filename(name)
227         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
228             if item.fuse_entry.dead is not True:
229                 raise Exception("Can only reparent dead inode entry")
230             if item.fuse_entry.inode is None:
231                 raise Exception("Reparented entry must still have valid inode")
232             item.fuse_entry.dead = False
233             self._entries[name] = item.fuse_entry
234         elif isinstance(item, arvados.collection.RichCollectionBase):
235             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
236             self._entries[name].populate(mtime)
237         else:
238             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
239         item.fuse_entry = self._entries[name]
240
241     def on_event(self, event, collection, name, item):
242         if collection == self.collection:
243             name = sanitize_filename(name)
244             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
245             with llfuse.lock:
246                 if event == arvados.collection.ADD:
247                     self.new_entry(name, item, self.mtime())
248                 elif event == arvados.collection.DEL:
249                     ent = self._entries[name]
250                     del self._entries[name]
251                     self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
252                     self.inodes.del_entry(ent)
253                 elif event == arvados.collection.MOD:
254                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
255                         self.inodes.invalidate_inode(item.fuse_entry.inode)
256                     elif name in self._entries:
257                         self.inodes.invalidate_inode(self._entries[name].inode)
258
259     def populate(self, mtime):
260         self._mtime = mtime
261         self.collection.subscribe(self.on_event)
262         for entry, item in self.collection.items():
263             self.new_entry(entry, item, self.mtime())
264
265     def writable(self):
266         return self.collection.writable()
267
268     @use_counter
269     def flush(self):
270         with llfuse.lock_released:
271             self.collection.root_collection().save()
272
273     @use_counter
274     @check_update
275     def create(self, name):
276         with llfuse.lock_released:
277             self.collection.open(name, "w").close()
278
279     @use_counter
280     @check_update
281     def mkdir(self, name):
282         with llfuse.lock_released:
283             self.collection.mkdirs(name)
284
285     @use_counter
286     @check_update
287     def unlink(self, name):
288         with llfuse.lock_released:
289             self.collection.remove(name)
290         self.flush()
291
292     @use_counter
293     @check_update
294     def rmdir(self, name):
295         with llfuse.lock_released:
296             self.collection.remove(name)
297         self.flush()
298
299     @use_counter
300     @check_update
301     def rename(self, name_old, name_new, src):
302         if not isinstance(src, CollectionDirectoryBase):
303             raise llfuse.FUSEError(errno.EPERM)
304
305         if name_new in self:
306             ent = src[name_old]
307             tgt = self[name_new]
308             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
309                 pass
310             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
311                 if len(tgt) > 0:
312                     raise llfuse.FUSEError(errno.ENOTEMPTY)
313             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
314                 raise llfuse.FUSEError(errno.ENOTDIR)
315             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
316                 raise llfuse.FUSEError(errno.EISDIR)
317
318         with llfuse.lock_released:
319             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
320         self.flush()
321         src.flush()
322
323
324 class CollectionDirectory(CollectionDirectoryBase):
325     """Represents the root of a directory tree representing a collection."""
326
327     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
328         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
329         self.api = api
330         self.num_retries = num_retries
331         self.collection_record_file = None
332         self.collection_record = None
333         self._poll = True
334         try:
335             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2)/2)
336         except:
337             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
338             self._poll_time = 60*60
339
340         if isinstance(collection_record, dict):
341             self.collection_locator = collection_record['uuid']
342             self._mtime = convertTime(collection_record.get('modified_at'))
343         else:
344             self.collection_locator = collection_record
345             self._mtime = 0
346         self._manifest_size = 0
347         if self.collection_locator:
348             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
349         self._updating_lock = threading.Lock()
350
351     def same(self, i):
352         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
353
354     def writable(self):
355         return self.collection.writable() if self.collection is not None else self._writable
356
357     def want_event_subscribe(self):
358         return (uuid_pattern.match(self.collection_locator) is not None)
359
360     # Used by arv-web.py to switch the contents of the CollectionDirectory
361     def change_collection(self, new_locator):
362         """Switch the contents of the CollectionDirectory.
363
364         Must be called with llfuse.lock held.
365         """
366
367         self.collection_locator = new_locator
368         self.collection_record = None
369         self.update()
370
371     def new_collection(self, new_collection_record, coll_reader):
372         if self.inode:
373             self.clear(force=True)
374
375         self.collection_record = new_collection_record
376
377         if self.collection_record:
378             self._mtime = convertTime(self.collection_record.get('modified_at'))
379             self.collection_locator = self.collection_record["uuid"]
380             if self.collection_record_file is not None:
381                 self.collection_record_file.update(self.collection_record)
382
383         self.collection = coll_reader
384         self.populate(self.mtime())
385
386     def uuid(self):
387         return self.collection_locator
388
389     @use_counter
390     def update(self, to_record_version=None):
391         try:
392             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
393                 return True
394
395             if self.collection_locator is None:
396                 self.fresh()
397                 return True
398
399             try:
400                 with llfuse.lock_released:
401                     self._updating_lock.acquire()
402                     if not self.stale():
403                         return
404
405                     _logger.debug("Updating %s", to_record_version)
406                     if self.collection is not None:
407                         if self.collection.known_past_version(to_record_version):
408                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
409                         else:
410                             self.collection.update()
411                     else:
412                         if uuid_pattern.match(self.collection_locator):
413                             coll_reader = arvados.collection.Collection(
414                                 self.collection_locator, self.api, self.api.keep,
415                                 num_retries=self.num_retries)
416                         else:
417                             coll_reader = arvados.collection.CollectionReader(
418                                 self.collection_locator, self.api, self.api.keep,
419                                 num_retries=self.num_retries)
420                         new_collection_record = coll_reader.api_response() or {}
421                         # If the Collection only exists in Keep, there will be no API
422                         # response.  Fill in the fields we need.
423                         if 'uuid' not in new_collection_record:
424                             new_collection_record['uuid'] = self.collection_locator
425                         if "portable_data_hash" not in new_collection_record:
426                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
427                         if 'manifest_text' not in new_collection_record:
428                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
429
430                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
431                             self.new_collection(new_collection_record, coll_reader)
432
433                         self._manifest_size = len(coll_reader.manifest_text())
434                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
435                 # end with llfuse.lock_released, re-acquire lock
436
437                 self.fresh()
438                 return True
439             finally:
440                 self._updating_lock.release()
441         except arvados.errors.NotFoundError as e:
442             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
443         except arvados.errors.ArgumentError as detail:
444             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
445             if self.collection_record is not None and "manifest_text" in self.collection_record:
446                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
447         except Exception:
448             _logger.exception("arv-mount %s: error", self.collection_locator)
449             if self.collection_record is not None and "manifest_text" in self.collection_record:
450                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
451         self.invalidate()
452         return False
453
454     @use_counter
455     @check_update
456     def __getitem__(self, item):
457         if item == '.arvados#collection':
458             if self.collection_record_file is None:
459                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
460                 self.inodes.add_entry(self.collection_record_file)
461             return self.collection_record_file
462         else:
463             return super(CollectionDirectory, self).__getitem__(item)
464
465     def __contains__(self, k):
466         if k == '.arvados#collection':
467             return True
468         else:
469             return super(CollectionDirectory, self).__contains__(k)
470
471     def invalidate(self):
472         self.collection_record = None
473         self.collection_record_file = None
474         super(CollectionDirectory, self).invalidate()
475
476     def persisted(self):
477         return (self.collection_locator is not None)
478
479     def objsize(self):
480         # This is an empirically-derived heuristic to estimate the memory used
481         # to store this collection's metadata.  Calculating the memory
482         # footprint directly would be more accurate, but also more complicated.
483         return self._manifest_size * 128
484
485     def finalize(self):
486         if self.collection is not None:
487             if self.writable():
488                 self.collection.save()
489             self.collection.stop_threads()
490
491
492 class TmpCollectionDirectory(CollectionDirectoryBase):
493     """A directory backed by an Arvados collection that never gets saved.
494
495     This supports using Keep as scratch space. A userspace program can
496     read the .arvados#collection file to get a current manifest in
497     order to save a snapshot of the scratch data or use it as a crunch
498     job output.
499     """
500
501     class UnsaveableCollection(arvados.collection.Collection):
502         def save(self):
503             pass
504         def save_new(self):
505             pass
506
507     def __init__(self, parent_inode, inodes, api_client, num_retries):
508         collection = self.UnsaveableCollection(
509             api_client=api_client,
510             keep_client=api_client.keep,
511             num_retries=num_retries)
512         super(TmpCollectionDirectory, self).__init__(
513             parent_inode, inodes, collection)
514         self.collection_record_file = None
515         self.populate(self.mtime())
516
517     def on_event(self, *args, **kwargs):
518         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
519         if self.collection_record_file:
520             with llfuse.lock:
521                 self.collection_record_file.invalidate()
522             self.inodes.invalidate_inode(self.collection_record_file.inode)
523             _logger.debug("%s invalidated collection record", self)
524
525     def collection_record(self):
526         with llfuse.lock_released:
527             return {
528                 "uuid": None,
529                 "manifest_text": self.collection.manifest_text(),
530                 "portable_data_hash": self.collection.portable_data_hash(),
531             }
532
533     def __contains__(self, k):
534         return (k == '.arvados#collection' or
535                 super(TmpCollectionDirectory, self).__contains__(k))
536
537     @use_counter
538     def __getitem__(self, item):
539         if item == '.arvados#collection':
540             if self.collection_record_file is None:
541                 self.collection_record_file = FuncToJSONFile(
542                     self.inode, self.collection_record)
543                 self.inodes.add_entry(self.collection_record_file)
544             return self.collection_record_file
545         return super(TmpCollectionDirectory, self).__getitem__(item)
546
547     def persisted(self):
548         return False
549
550     def writable(self):
551         return True
552
553     def want_event_subscribe(self):
554         return False
555
556     def finalize(self):
557         self.collection.stop_threads()
558
559     def invalidate(self):
560         if self.collection_record_file:
561             self.collection_record_file.invalidate()
562         super(TmpCollectionDirectory, self).invalidate()
563
564
565 class MagicDirectory(Directory):
566     """A special directory that logically contains the set of all extant keep locators.
567
568     When a file is referenced by lookup(), it is tested to see if it is a valid
569     keep locator to a manifest, and if so, loads the manifest contents as a
570     subdirectory of this directory with the locator as the directory name.
571     Since querying a list of all extant keep locators is impractical, only
572     collections that have already been accessed are visible to readdir().
573
574     """
575
576     README_TEXT = """
577 This directory provides access to Arvados collections as subdirectories listed
578 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
579 the form '1234567890abcdef0123456789abcdef+123').
580
581 Note that this directory will appear empty until you attempt to access a
582 specific collection subdirectory (such as trying to 'cd' into it), at which
583 point the collection will actually be looked up on the server and the directory
584 will appear if it exists.
585
586 """.lstrip()
587
588     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
589         super(MagicDirectory, self).__init__(parent_inode, inodes)
590         self.api = api
591         self.num_retries = num_retries
592         self.pdh_only = pdh_only
593
594     def __setattr__(self, name, value):
595         super(MagicDirectory, self).__setattr__(name, value)
596         # When we're assigned an inode, add a README.
597         if ((name == 'inode') and (self.inode is not None) and
598               (not self._entries)):
599             self._entries['README'] = self.inodes.add_entry(
600                 StringFile(self.inode, self.README_TEXT, time.time()))
601             # If we're the root directory, add an identical by_id subdirectory.
602             if self.inode == llfuse.ROOT_INODE:
603                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
604                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
605
606     def __contains__(self, k):
607         if k in self._entries:
608             return True
609
610         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
611             return False
612
613         try:
614             e = self.inodes.add_entry(CollectionDirectory(
615                     self.inode, self.inodes, self.api, self.num_retries, k))
616
617             if e.update():
618                 if k not in self._entries:
619                     self._entries[k] = e
620                 else:
621                     self.inodes.del_entry(e)
622                 return True
623             else:
624                 self.inodes.invalidate_entry(self.inode, k)
625                 self.inodes.del_entry(e)
626                 return False
627         except Exception as ex:
628             _logger.debug('arv-mount exception keep %s', ex)
629             self.inodes.del_entry(e)
630             return False
631
632     def __getitem__(self, item):
633         if item in self:
634             return self._entries[item]
635         else:
636             raise KeyError("No collection with id " + item)
637
638     def clear(self, force=False):
639         pass
640
641     def want_event_subscribe(self):
642         return not self.pdh_only
643
644 class RecursiveInvalidateDirectory(Directory):
645     def invalidate(self):
646         try:
647             super(RecursiveInvalidateDirectory, self).invalidate()
648             for a in self._entries:
649                 self._entries[a].invalidate()
650         except Exception:
651             _logger.exception()
652
653
654 class TagsDirectory(RecursiveInvalidateDirectory):
655     """A special directory that contains as subdirectories all tags visible to the user."""
656
657     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
658         super(TagsDirectory, self).__init__(parent_inode, inodes)
659         self.api = api
660         self.num_retries = num_retries
661         self._poll = True
662         self._poll_time = poll_time
663
664     @use_counter
665     def update(self):
666         with llfuse.lock_released:
667             tags = self.api.links().list(
668                 filters=[['link_class', '=', 'tag']],
669                 select=['name'], distinct=True
670                 ).execute(num_retries=self.num_retries)
671         if "items" in tags:
672             self.merge(tags['items'],
673                        lambda i: i['name'],
674                        lambda a, i: a.tag == i['name'],
675                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
676
677
678 class TagDirectory(Directory):
679     """A special directory that contains as subdirectories all collections visible
680     to the user that are tagged with a particular tag.
681     """
682
683     def __init__(self, parent_inode, inodes, api, num_retries, tag,
684                  poll=False, poll_time=60):
685         super(TagDirectory, self).__init__(parent_inode, inodes)
686         self.api = api
687         self.num_retries = num_retries
688         self.tag = tag
689         self._poll = poll
690         self._poll_time = poll_time
691
692     def want_event_subscribe(self):
693         return True
694
695     @use_counter
696     def update(self):
697         with llfuse.lock_released:
698             taggedcollections = self.api.links().list(
699                 filters=[['link_class', '=', 'tag'],
700                          ['name', '=', self.tag],
701                          ['head_uuid', 'is_a', 'arvados#collection']],
702                 select=['head_uuid']
703                 ).execute(num_retries=self.num_retries)
704         self.merge(taggedcollections['items'],
705                    lambda i: i['head_uuid'],
706                    lambda a, i: a.collection_locator == i['head_uuid'],
707                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
708
709
710 class ProjectDirectory(Directory):
711     """A special directory that contains the contents of a project."""
712
713     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
714                  poll=False, poll_time=60):
715         super(ProjectDirectory, self).__init__(parent_inode, inodes)
716         self.api = api
717         self.num_retries = num_retries
718         self.project_object = project_object
719         self.project_object_file = None
720         self.project_uuid = project_object['uuid']
721         self._poll = poll
722         self._poll_time = poll_time
723         self._updating_lock = threading.Lock()
724         self._current_user = None
725
726     def want_event_subscribe(self):
727         return True
728
729     def createDirectory(self, i):
730         if collection_uuid_pattern.match(i['uuid']):
731             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
732         elif group_uuid_pattern.match(i['uuid']):
733             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
734         elif link_uuid_pattern.match(i['uuid']):
735             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
736                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
737             else:
738                 return None
739         elif uuid_pattern.match(i['uuid']):
740             return ObjectFile(self.parent_inode, i)
741         else:
742             return None
743
744     def uuid(self):
745         return self.project_uuid
746
747     @use_counter
748     def update(self):
749         if self.project_object_file == None:
750             self.project_object_file = ObjectFile(self.inode, self.project_object)
751             self.inodes.add_entry(self.project_object_file)
752
753         def namefn(i):
754             if 'name' in i:
755                 if i['name'] is None or len(i['name']) == 0:
756                     return None
757                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
758                     # collection or subproject
759                     return i['name']
760                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
761                     # name link
762                     return i['name']
763                 elif 'kind' in i and i['kind'].startswith('arvados#'):
764                     # something else
765                     return "{}.{}".format(i['name'], i['kind'][8:])
766             else:
767                 return None
768
769         def samefn(a, i):
770             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
771                 return a.uuid() == i['uuid']
772             elif isinstance(a, ObjectFile):
773                 return a.uuid() == i['uuid'] and not a.stale()
774             return False
775
776         try:
777             with llfuse.lock_released:
778                 self._updating_lock.acquire()
779                 if not self.stale():
780                     return
781
782                 if group_uuid_pattern.match(self.project_uuid):
783                     self.project_object = self.api.groups().get(
784                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
785                 elif user_uuid_pattern.match(self.project_uuid):
786                     self.project_object = self.api.users().get(
787                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
788
789                 contents = arvados.util.list_all(self.api.groups().contents,
790                                                  self.num_retries, uuid=self.project_uuid)
791
792             # end with llfuse.lock_released, re-acquire lock
793
794             self.merge(contents,
795                        namefn,
796                        samefn,
797                        self.createDirectory)
798         finally:
799             self._updating_lock.release()
800
801     @use_counter
802     @check_update
803     def __getitem__(self, item):
804         if item == '.arvados#project':
805             return self.project_object_file
806         else:
807             return super(ProjectDirectory, self).__getitem__(item)
808
809     def __contains__(self, k):
810         if k == '.arvados#project':
811             return True
812         else:
813             return super(ProjectDirectory, self).__contains__(k)
814
815     @use_counter
816     @check_update
817     def writable(self):
818         with llfuse.lock_released:
819             if not self._current_user:
820                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
821             return self._current_user["uuid"] in self.project_object["writable_by"]
822
823     def persisted(self):
824         return True
825
826     @use_counter
827     @check_update
828     def mkdir(self, name):
829         try:
830             with llfuse.lock_released:
831                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
832                                                     "name": name,
833                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
834             self.invalidate()
835         except apiclient_errors.Error as error:
836             _logger.error(error)
837             raise llfuse.FUSEError(errno.EEXIST)
838
839     @use_counter
840     @check_update
841     def rmdir(self, name):
842         if name not in self:
843             raise llfuse.FUSEError(errno.ENOENT)
844         if not isinstance(self[name], CollectionDirectory):
845             raise llfuse.FUSEError(errno.EPERM)
846         if len(self[name]) > 0:
847             raise llfuse.FUSEError(errno.ENOTEMPTY)
848         with llfuse.lock_released:
849             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
850         self.invalidate()
851
852     @use_counter
853     @check_update
854     def rename(self, name_old, name_new, src):
855         if not isinstance(src, ProjectDirectory):
856             raise llfuse.FUSEError(errno.EPERM)
857
858         ent = src[name_old]
859
860         if not isinstance(ent, CollectionDirectory):
861             raise llfuse.FUSEError(errno.EPERM)
862
863         if name_new in self:
864             # POSIX semantics for replacing one directory with another is
865             # tricky (the target directory must be empty, the operation must be
866             # atomic which isn't possible with the Arvados API as of this
867             # writing) so don't support that.
868             raise llfuse.FUSEError(errno.EPERM)
869
870         self.api.collections().update(uuid=ent.uuid(),
871                                       body={"owner_uuid": self.uuid(),
872                                             "name": name_new}).execute(num_retries=self.num_retries)
873
874         # Acually move the entry from source directory to this directory.
875         del src._entries[name_old]
876         self._entries[name_new] = ent
877         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
878
879
880 class SharedDirectory(Directory):
881     """A special directory that represents users or groups who have shared projects with me."""
882
883     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
884                  poll=False, poll_time=60):
885         super(SharedDirectory, self).__init__(parent_inode, inodes)
886         self.api = api
887         self.num_retries = num_retries
888         self.current_user = api.users().current().execute(num_retries=num_retries)
889         self._poll = True
890         self._poll_time = poll_time
891
892     @use_counter
893     def update(self):
894         with llfuse.lock_released:
895             all_projects = arvados.util.list_all(
896                 self.api.groups().list, self.num_retries,
897                 filters=[['group_class','=','project']])
898             objects = {}
899             for ob in all_projects:
900                 objects[ob['uuid']] = ob
901
902             roots = []
903             root_owners = {}
904             for ob in all_projects:
905                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
906                     roots.append(ob)
907                     root_owners[ob['owner_uuid']] = True
908
909             lusers = arvados.util.list_all(
910                 self.api.users().list, self.num_retries,
911                 filters=[['uuid','in', list(root_owners)]])
912             lgroups = arvados.util.list_all(
913                 self.api.groups().list, self.num_retries,
914                 filters=[['uuid','in', list(root_owners)]])
915
916             users = {}
917             groups = {}
918
919             for l in lusers:
920                 objects[l["uuid"]] = l
921             for l in lgroups:
922                 objects[l["uuid"]] = l
923
924             contents = {}
925             for r in root_owners:
926                 if r in objects:
927                     obr = objects[r]
928                     if obr.get("name"):
929                         contents[obr["name"]] = obr
930                     #elif obr.get("username"):
931                     #    contents[obr["username"]] = obr
932                     elif "first_name" in obr:
933                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
934
935
936             for r in roots:
937                 if r['owner_uuid'] not in objects:
938                     contents[r['name']] = r
939
940         # end with llfuse.lock_released, re-acquire lock
941
942         try:
943             self.merge(contents.items(),
944                        lambda i: i[0],
945                        lambda a, i: a.uuid() == i[1]['uuid'],
946                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
947         except Exception:
948             _logger.exception()