Merge branch 'master' into 8183-projects-dropdown
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8 import threading
9 from apiclient import errors as apiclient_errors
10 import errno
11 import time
12
13 from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
14 from fresh import FreshBase, convertTime, use_counter, check_update
15
16 import arvados.collection
17 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
18
19 _logger = logging.getLogger('arvados.arvados_fuse')
20
21
22 # Match any character which FUSE or Linux cannot accommodate as part
23 # of a filename. (If present in a collection filename, they will
24 # appear as underscores in the fuse mount.)
25 _disallowed_filename_characters = re.compile('[\x00/]')
26
27 # '.' and '..' are not reachable if API server is newer than #6277
28 def sanitize_filename(dirty):
29     """Replace disallowed filename characters with harmless "_"."""
30     if dirty is None:
31         return None
32     elif dirty == '':
33         return '_'
34     elif dirty == '.':
35         return '_'
36     elif dirty == '..':
37         return '__'
38     else:
39         return _disallowed_filename_characters.sub('_', dirty)
40
41
42 class Directory(FreshBase):
43     """Generic directory object, backed by a dict.
44
45     Consists of a set of entries with the key representing the filename
46     and the value referencing a File or Directory object.
47     """
48
49     def __init__(self, parent_inode, inodes):
50         """parent_inode is the integer inode number"""
51
52         super(Directory, self).__init__()
53
54         self.inode = None
55         if not isinstance(parent_inode, int):
56             raise Exception("parent_inode should be an int")
57         self.parent_inode = parent_inode
58         self.inodes = inodes
59         self._entries = {}
60         self._mtime = time.time()
61
62     #  Overriden by subclasses to implement logic to update the entries dict
63     #  when the directory is stale
64     @use_counter
65     def update(self):
66         pass
67
68     # Only used when computing the size of the disk footprint of the directory
69     # (stub)
70     def size(self):
71         return 0
72
73     def persisted(self):
74         return False
75
76     def checkupdate(self):
77         if self.stale():
78             try:
79                 self.update()
80             except apiclient.errors.HttpError as e:
81                 _logger.warn(e)
82
83     @use_counter
84     @check_update
85     def __getitem__(self, item):
86         return self._entries[item]
87
88     @use_counter
89     @check_update
90     def items(self):
91         return list(self._entries.items())
92
93     @use_counter
94     @check_update
95     def __contains__(self, k):
96         return k in self._entries
97
98     @use_counter
99     @check_update
100     def __len__(self):
101         return len(self._entries)
102
103     def fresh(self):
104         self.inodes.touch(self)
105         super(Directory, self).fresh()
106
107     def merge(self, items, fn, same, new_entry):
108         """Helper method for updating the contents of the directory.
109
110         Takes a list describing the new contents of the directory, reuse
111         entries that are the same in both the old and new lists, create new
112         entries, and delete old entries missing from the new list.
113
114         :items: iterable with new directory contents
115
116         :fn: function to take an entry in 'items' and return the desired file or
117         directory name, or None if this entry should be skipped
118
119         :same: function to compare an existing entry (a File or Directory
120         object) with an entry in the items list to determine whether to keep
121         the existing entry.
122
123         :new_entry: function to create a new directory entry (File or Directory
124         object) from an entry in the items list.
125
126         """
127
128         oldentries = self._entries
129         self._entries = {}
130         changed = False
131         for i in items:
132             name = sanitize_filename(fn(i))
133             if name:
134                 if name in oldentries and same(oldentries[name], i):
135                     # move existing directory entry over
136                     self._entries[name] = oldentries[name]
137                     del oldentries[name]
138                 else:
139                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
140                     # create new directory entry
141                     ent = new_entry(i)
142                     if ent is not None:
143                         self._entries[name] = self.inodes.add_entry(ent)
144                         changed = True
145
146         # delete any other directory entries that were not in found in 'items'
147         for i in oldentries:
148             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
149             self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
150             self.inodes.del_entry(oldentries[i])
151             changed = True
152
153         if changed:
154             self.inodes.invalidate_inode(self.inode)
155             self._mtime = time.time()
156
157         self.fresh()
158
159     def clear(self, force=False):
160         """Delete all entries"""
161
162         if not self.in_use() or force:
163             oldentries = self._entries
164             self._entries = {}
165             for n in oldentries:
166                 if not oldentries[n].clear(force):
167                     self._entries = oldentries
168                     return False
169             for n in oldentries:
170                 self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
171                 self.inodes.del_entry(oldentries[n])
172             self.inodes.invalidate_inode(self.inode)
173             self.invalidate()
174             return True
175         else:
176             return False
177
178     def mtime(self):
179         return self._mtime
180
181     def writable(self):
182         return False
183
184     def flush(self):
185         pass
186
187     def want_event_subscribe(self):
188         raise NotImplementedError()
189
190     def create(self, name):
191         raise NotImplementedError()
192
193     def mkdir(self, name):
194         raise NotImplementedError()
195
196     def unlink(self, name):
197         raise NotImplementedError()
198
199     def rmdir(self, name):
200         raise NotImplementedError()
201
202     def rename(self, name_old, name_new, src):
203         raise NotImplementedError()
204
205
206 class CollectionDirectoryBase(Directory):
207     """Represent an Arvados Collection as a directory.
208
209     This class is used for Subcollections, and is also the base class for
210     CollectionDirectory, which implements collection loading/saving on
211     Collection records.
212
213     Most operations act only the underlying Arvados `Collection` object.  The
214     `Collection` object signals via a notify callback to
215     `CollectionDirectoryBase.on_event` that an item was added, removed or
216     modified.  FUSE inodes and directory entries are created, deleted or
217     invalidated in response to these events.
218
219     """
220
221     def __init__(self, parent_inode, inodes, collection):
222         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
223         self.collection = collection
224
225     def new_entry(self, name, item, mtime):
226         name = sanitize_filename(name)
227         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
228             if item.fuse_entry.dead is not True:
229                 raise Exception("Can only reparent dead inode entry")
230             if item.fuse_entry.inode is None:
231                 raise Exception("Reparented entry must still have valid inode")
232             item.fuse_entry.dead = False
233             self._entries[name] = item.fuse_entry
234         elif isinstance(item, arvados.collection.RichCollectionBase):
235             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
236             self._entries[name].populate(mtime)
237         else:
238             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
239         item.fuse_entry = self._entries[name]
240
241     def on_event(self, event, collection, name, item):
242         if collection == self.collection:
243             name = sanitize_filename(name)
244             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
245             with llfuse.lock:
246                 if event == arvados.collection.ADD:
247                     self.new_entry(name, item, self.mtime())
248                 elif event == arvados.collection.DEL:
249                     ent = self._entries[name]
250                     del self._entries[name]
251                     self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
252                     self.inodes.del_entry(ent)
253                 elif event == arvados.collection.MOD:
254                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
255                         self.inodes.invalidate_inode(item.fuse_entry.inode)
256                     elif name in self._entries:
257                         self.inodes.invalidate_inode(self._entries[name].inode)
258
259     def populate(self, mtime):
260         self._mtime = mtime
261         self.collection.subscribe(self.on_event)
262         for entry, item in self.collection.items():
263             self.new_entry(entry, item, self.mtime())
264
265     def writable(self):
266         return self.collection.writable()
267
268     @use_counter
269     def flush(self):
270         with llfuse.lock_released:
271             self.collection.root_collection().save()
272
273     @use_counter
274     @check_update
275     def create(self, name):
276         with llfuse.lock_released:
277             self.collection.open(name, "w").close()
278
279     @use_counter
280     @check_update
281     def mkdir(self, name):
282         with llfuse.lock_released:
283             self.collection.mkdirs(name)
284
285     @use_counter
286     @check_update
287     def unlink(self, name):
288         with llfuse.lock_released:
289             self.collection.remove(name)
290         self.flush()
291
292     @use_counter
293     @check_update
294     def rmdir(self, name):
295         with llfuse.lock_released:
296             self.collection.remove(name)
297         self.flush()
298
299     @use_counter
300     @check_update
301     def rename(self, name_old, name_new, src):
302         if not isinstance(src, CollectionDirectoryBase):
303             raise llfuse.FUSEError(errno.EPERM)
304
305         if name_new in self:
306             ent = src[name_old]
307             tgt = self[name_new]
308             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
309                 pass
310             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
311                 if len(tgt) > 0:
312                     raise llfuse.FUSEError(errno.ENOTEMPTY)
313             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
314                 raise llfuse.FUSEError(errno.ENOTDIR)
315             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
316                 raise llfuse.FUSEError(errno.EISDIR)
317
318         with llfuse.lock_released:
319             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
320         self.flush()
321         src.flush()
322
323
324 class CollectionDirectory(CollectionDirectoryBase):
325     """Represents the root of a directory tree representing a collection."""
326
327     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
328         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
329         self.api = api
330         self.num_retries = num_retries
331         self.collection_record_file = None
332         self.collection_record = None
333         self._poll = True
334         try:
335             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2)/2)
336         except:
337             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
338             self._poll_time = 60*60
339
340         if isinstance(collection_record, dict):
341             self.collection_locator = collection_record['uuid']
342             self._mtime = convertTime(collection_record.get('modified_at'))
343         else:
344             self.collection_locator = collection_record
345             self._mtime = 0
346         self._manifest_size = 0
347         if self.collection_locator:
348             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
349         self._updating_lock = threading.Lock()
350
351     def same(self, i):
352         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
353
354     def writable(self):
355         return self.collection.writable() if self.collection is not None else self._writable
356
357     def want_event_subscribe(self):
358         return (uuid_pattern.match(self.collection_locator) is not None)
359
360     # Used by arv-web.py to switch the contents of the CollectionDirectory
361     def change_collection(self, new_locator):
362         """Switch the contents of the CollectionDirectory.
363
364         Must be called with llfuse.lock held.
365         """
366
367         self.collection_locator = new_locator
368         self.collection_record = None
369         self.update()
370
371     def new_collection(self, new_collection_record, coll_reader):
372         if self.inode:
373             self.clear(force=True)
374
375         self.collection_record = new_collection_record
376
377         if self.collection_record:
378             self._mtime = convertTime(self.collection_record.get('modified_at'))
379             self.collection_locator = self.collection_record["uuid"]
380             if self.collection_record_file is not None:
381                 self.collection_record_file.update(self.collection_record)
382
383         self.collection = coll_reader
384         self.populate(self.mtime())
385
386     def uuid(self):
387         return self.collection_locator
388
389     @use_counter
390     def update(self, to_record_version=None):
391         try:
392             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
393                 return True
394
395             if self.collection_locator is None:
396                 self.fresh()
397                 return True
398
399             try:
400                 with llfuse.lock_released:
401                     self._updating_lock.acquire()
402                     if not self.stale():
403                         return
404
405                     _logger.debug("Updating %s", to_record_version)
406                     if self.collection is not None:
407                         if self.collection.known_past_version(to_record_version):
408                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
409                         else:
410                             self.collection.update()
411                     else:
412                         if uuid_pattern.match(self.collection_locator):
413                             coll_reader = arvados.collection.Collection(
414                                 self.collection_locator, self.api, self.api.keep,
415                                 num_retries=self.num_retries)
416                         else:
417                             coll_reader = arvados.collection.CollectionReader(
418                                 self.collection_locator, self.api, self.api.keep,
419                                 num_retries=self.num_retries)
420                         new_collection_record = coll_reader.api_response() or {}
421                         # If the Collection only exists in Keep, there will be no API
422                         # response.  Fill in the fields we need.
423                         if 'uuid' not in new_collection_record:
424                             new_collection_record['uuid'] = self.collection_locator
425                         if "portable_data_hash" not in new_collection_record:
426                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
427                         if 'manifest_text' not in new_collection_record:
428                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
429
430                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
431                             self.new_collection(new_collection_record, coll_reader)
432
433                         self._manifest_size = len(coll_reader.manifest_text())
434                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
435                 # end with llfuse.lock_released, re-acquire lock
436
437                 self.fresh()
438                 return True
439             finally:
440                 self._updating_lock.release()
441         except arvados.errors.NotFoundError as e:
442             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
443         except arvados.errors.ArgumentError as detail:
444             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
445             if self.collection_record is not None and "manifest_text" in self.collection_record:
446                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
447         except Exception:
448             _logger.exception("arv-mount %s: error", self.collection_locator)
449             if self.collection_record is not None and "manifest_text" in self.collection_record:
450                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
451         self.invalidate()
452         return False
453
454     @use_counter
455     @check_update
456     def __getitem__(self, item):
457         if item == '.arvados#collection':
458             if self.collection_record_file is None:
459                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
460                 self.inodes.add_entry(self.collection_record_file)
461             return self.collection_record_file
462         else:
463             return super(CollectionDirectory, self).__getitem__(item)
464
465     def __contains__(self, k):
466         if k == '.arvados#collection':
467             return True
468         else:
469             return super(CollectionDirectory, self).__contains__(k)
470
471     def invalidate(self):
472         self.collection_record = None
473         self.collection_record_file = None
474         super(CollectionDirectory, self).invalidate()
475
476     def persisted(self):
477         return (self.collection_locator is not None)
478
479     def objsize(self):
480         # This is an empirically-derived heuristic to estimate the memory used
481         # to store this collection's metadata.  Calculating the memory
482         # footprint directly would be more accurate, but also more complicated.
483         return self._manifest_size * 128
484
485     def finalize(self):
486         if self.collection is not None:
487             if self.writable():
488                 self.collection.save()
489             self.collection.stop_threads()
490
491
492 class TmpCollectionDirectory(CollectionDirectoryBase):
493     """A directory backed by an Arvados collection that never gets saved.
494
495     This supports using Keep as scratch space. A userspace program can
496     read the .arvados#collection file to get a current manifest in
497     order to save a snapshot of the scratch data or use it as a crunch
498     job output.
499     """
500
501     class UnsaveableCollection(arvados.collection.Collection):
502         def save(self):
503             pass
504         def save_new(self):
505             pass
506
507     def __init__(self, parent_inode, inodes, api_client, num_retries):
508         collection = self.UnsaveableCollection(
509             api_client=api_client,
510             keep_client=api_client.keep,
511             num_retries=num_retries)
512         super(TmpCollectionDirectory, self).__init__(
513             parent_inode, inodes, collection)
514         self.collection_record_file = None
515         self.populate(self.mtime())
516
517     def on_event(self, *args, **kwargs):
518         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
519         if self.collection_record_file:
520             with llfuse.lock:
521                 self.collection_record_file.invalidate()
522             self.inodes.invalidate_inode(self.collection_record_file.inode)
523             _logger.debug("%s invalidated collection record", self)
524
525     def collection_record(self):
526         with llfuse.lock_released:
527             return {
528                 "uuid": None,
529                 "manifest_text": self.collection.manifest_text(),
530                 "portable_data_hash": self.collection.portable_data_hash(),
531             }
532
533     def __contains__(self, k):
534         return (k == '.arvados#collection' or
535                 super(TmpCollectionDirectory, self).__contains__(k))
536
537     @use_counter
538     def __getitem__(self, item):
539         if item == '.arvados#collection':
540             if self.collection_record_file is None:
541                 self.collection_record_file = FuncToJSONFile(
542                     self.inode, self.collection_record)
543                 self.inodes.add_entry(self.collection_record_file)
544             return self.collection_record_file
545         return super(TmpCollectionDirectory, self).__getitem__(item)
546
547     def persisted(self):
548         return False
549
550     def writable(self):
551         return True
552
553     def want_event_subscribe(self):
554         return False
555
556     def finalize(self):
557         self.collection.stop_threads()
558
559     def invalidate(self):
560         if self.collection_record_file:
561             self.collection_record_file.invalidate()
562         super(TmpCollectionDirectory, self).invalidate()
563
564
565 class MagicDirectory(Directory):
566     """A special directory that logically contains the set of all extant keep locators.
567
568     When a file is referenced by lookup(), it is tested to see if it is a valid
569     keep locator to a manifest, and if so, loads the manifest contents as a
570     subdirectory of this directory with the locator as the directory name.
571     Since querying a list of all extant keep locators is impractical, only
572     collections that have already been accessed are visible to readdir().
573
574     """
575
576     README_TEXT = """
577 This directory provides access to Arvados collections as subdirectories listed
578 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
579 the form '1234567890abcdef0123456789abcdef+123').
580
581 Note that this directory will appear empty until you attempt to access a
582 specific collection subdirectory (such as trying to 'cd' into it), at which
583 point the collection will actually be looked up on the server and the directory
584 will appear if it exists.
585
586 """.lstrip()
587
588     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
589         super(MagicDirectory, self).__init__(parent_inode, inodes)
590         self.api = api
591         self.num_retries = num_retries
592         self.pdh_only = pdh_only
593
594     def __setattr__(self, name, value):
595         super(MagicDirectory, self).__setattr__(name, value)
596         # When we're assigned an inode, add a README.
597         if ((name == 'inode') and (self.inode is not None) and
598               (not self._entries)):
599             self._entries['README'] = self.inodes.add_entry(
600                 StringFile(self.inode, self.README_TEXT, time.time()))
601             # If we're the root directory, add an identical by_id subdirectory.
602             if self.inode == llfuse.ROOT_INODE:
603                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
604                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
605
606     def __contains__(self, k):
607         if k in self._entries:
608             return True
609
610         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
611             return False
612
613         try:
614             e = self.inodes.add_entry(CollectionDirectory(
615                     self.inode, self.inodes, self.api, self.num_retries, k))
616
617             if e.update():
618                 if k not in self._entries:
619                     self._entries[k] = e
620                 else:
621                     self.inodes.del_entry(e)
622                 return True
623             else:
624                 self.inodes.invalidate_entry(self.inode, k)
625                 self.inodes.del_entry(e)
626                 return False
627         except Exception as ex:
628             _logger.debug('arv-mount exception keep %s', ex)
629             self.inodes.del_entry(e)
630             return False
631
632     def __getitem__(self, item):
633         if item in self:
634             return self._entries[item]
635         else:
636             raise KeyError("No collection with id " + item)
637
638     def clear(self, force=False):
639         pass
640
641     def want_event_subscribe(self):
642         return not self.pdh_only
643
644
645 class RecursiveInvalidateDirectory(Directory):
646     def invalidate(self):
647         try:
648             super(RecursiveInvalidateDirectory, self).invalidate()
649             for a in self._entries:
650                 self._entries[a].invalidate()
651         except Exception:
652             _logger.exception()
653
654
655 class TagsDirectory(RecursiveInvalidateDirectory):
656     """A special directory that contains as subdirectories all tags visible to the user."""
657
658     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
659         super(TagsDirectory, self).__init__(parent_inode, inodes)
660         self.api = api
661         self.num_retries = num_retries
662         self._poll = True
663         self._poll_time = poll_time
664
665     def want_event_subscribe(self):
666         return True
667
668     @use_counter
669     def update(self):
670         with llfuse.lock_released:
671             tags = self.api.links().list(
672                 filters=[['link_class', '=', 'tag']],
673                 select=['name'], distinct=True
674                 ).execute(num_retries=self.num_retries)
675         if "items" in tags:
676             self.merge(tags['items'],
677                        lambda i: i['name'],
678                        lambda a, i: a.tag == i['name'],
679                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
680
681
682 class TagDirectory(Directory):
683     """A special directory that contains as subdirectories all collections visible
684     to the user that are tagged with a particular tag.
685     """
686
687     def __init__(self, parent_inode, inodes, api, num_retries, tag,
688                  poll=False, poll_time=60):
689         super(TagDirectory, self).__init__(parent_inode, inodes)
690         self.api = api
691         self.num_retries = num_retries
692         self.tag = tag
693         self._poll = poll
694         self._poll_time = poll_time
695
696     def want_event_subscribe(self):
697         return True
698
699     @use_counter
700     def update(self):
701         with llfuse.lock_released:
702             taggedcollections = self.api.links().list(
703                 filters=[['link_class', '=', 'tag'],
704                          ['name', '=', self.tag],
705                          ['head_uuid', 'is_a', 'arvados#collection']],
706                 select=['head_uuid']
707                 ).execute(num_retries=self.num_retries)
708         self.merge(taggedcollections['items'],
709                    lambda i: i['head_uuid'],
710                    lambda a, i: a.collection_locator == i['head_uuid'],
711                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
712
713
714 class ProjectDirectory(Directory):
715     """A special directory that contains the contents of a project."""
716
717     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
718                  poll=False, poll_time=60):
719         super(ProjectDirectory, self).__init__(parent_inode, inodes)
720         self.api = api
721         self.num_retries = num_retries
722         self.project_object = project_object
723         self.project_object_file = None
724         self.project_uuid = project_object['uuid']
725         self._poll = poll
726         self._poll_time = poll_time
727         self._updating_lock = threading.Lock()
728         self._current_user = None
729
730     def want_event_subscribe(self):
731         return True
732
733     def createDirectory(self, i):
734         if collection_uuid_pattern.match(i['uuid']):
735             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
736         elif group_uuid_pattern.match(i['uuid']):
737             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
738         elif link_uuid_pattern.match(i['uuid']):
739             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
740                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
741             else:
742                 return None
743         elif uuid_pattern.match(i['uuid']):
744             return ObjectFile(self.parent_inode, i)
745         else:
746             return None
747
748     def uuid(self):
749         return self.project_uuid
750
751     @use_counter
752     def update(self):
753         if self.project_object_file == None:
754             self.project_object_file = ObjectFile(self.inode, self.project_object)
755             self.inodes.add_entry(self.project_object_file)
756
757         def namefn(i):
758             if 'name' in i:
759                 if i['name'] is None or len(i['name']) == 0:
760                     return None
761                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
762                     # collection or subproject
763                     return i['name']
764                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
765                     # name link
766                     return i['name']
767                 elif 'kind' in i and i['kind'].startswith('arvados#'):
768                     # something else
769                     return "{}.{}".format(i['name'], i['kind'][8:])
770             else:
771                 return None
772
773         def samefn(a, i):
774             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
775                 return a.uuid() == i['uuid']
776             elif isinstance(a, ObjectFile):
777                 return a.uuid() == i['uuid'] and not a.stale()
778             return False
779
780         try:
781             with llfuse.lock_released:
782                 self._updating_lock.acquire()
783                 if not self.stale():
784                     return
785
786                 if group_uuid_pattern.match(self.project_uuid):
787                     self.project_object = self.api.groups().get(
788                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
789                 elif user_uuid_pattern.match(self.project_uuid):
790                     self.project_object = self.api.users().get(
791                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
792
793                 contents = arvados.util.list_all(self.api.groups().contents,
794                                                  self.num_retries, uuid=self.project_uuid)
795
796             # end with llfuse.lock_released, re-acquire lock
797
798             self.merge(contents,
799                        namefn,
800                        samefn,
801                        self.createDirectory)
802         finally:
803             self._updating_lock.release()
804
805     @use_counter
806     @check_update
807     def __getitem__(self, item):
808         if item == '.arvados#project':
809             return self.project_object_file
810         else:
811             return super(ProjectDirectory, self).__getitem__(item)
812
813     def __contains__(self, k):
814         if k == '.arvados#project':
815             return True
816         else:
817             return super(ProjectDirectory, self).__contains__(k)
818
819     @use_counter
820     @check_update
821     def writable(self):
822         with llfuse.lock_released:
823             if not self._current_user:
824                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
825             return self._current_user["uuid"] in self.project_object["writable_by"]
826
827     def persisted(self):
828         return True
829
830     @use_counter
831     @check_update
832     def mkdir(self, name):
833         try:
834             with llfuse.lock_released:
835                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
836                                                     "name": name,
837                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
838             self.invalidate()
839         except apiclient_errors.Error as error:
840             _logger.error(error)
841             raise llfuse.FUSEError(errno.EEXIST)
842
843     @use_counter
844     @check_update
845     def rmdir(self, name):
846         if name not in self:
847             raise llfuse.FUSEError(errno.ENOENT)
848         if not isinstance(self[name], CollectionDirectory):
849             raise llfuse.FUSEError(errno.EPERM)
850         if len(self[name]) > 0:
851             raise llfuse.FUSEError(errno.ENOTEMPTY)
852         with llfuse.lock_released:
853             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
854         self.invalidate()
855
856     @use_counter
857     @check_update
858     def rename(self, name_old, name_new, src):
859         if not isinstance(src, ProjectDirectory):
860             raise llfuse.FUSEError(errno.EPERM)
861
862         ent = src[name_old]
863
864         if not isinstance(ent, CollectionDirectory):
865             raise llfuse.FUSEError(errno.EPERM)
866
867         if name_new in self:
868             # POSIX semantics for replacing one directory with another is
869             # tricky (the target directory must be empty, the operation must be
870             # atomic which isn't possible with the Arvados API as of this
871             # writing) so don't support that.
872             raise llfuse.FUSEError(errno.EPERM)
873
874         self.api.collections().update(uuid=ent.uuid(),
875                                       body={"owner_uuid": self.uuid(),
876                                             "name": name_new}).execute(num_retries=self.num_retries)
877
878         # Acually move the entry from source directory to this directory.
879         del src._entries[name_old]
880         self._entries[name_new] = ent
881         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
882
883
884 class SharedDirectory(Directory):
885     """A special directory that represents users or groups who have shared projects with me."""
886
887     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
888                  poll=False, poll_time=60):
889         super(SharedDirectory, self).__init__(parent_inode, inodes)
890         self.api = api
891         self.num_retries = num_retries
892         self.current_user = api.users().current().execute(num_retries=num_retries)
893         self._poll = True
894         self._poll_time = poll_time
895
896     @use_counter
897     def update(self):
898         with llfuse.lock_released:
899             all_projects = arvados.util.list_all(
900                 self.api.groups().list, self.num_retries,
901                 filters=[['group_class','=','project']])
902             objects = {}
903             for ob in all_projects:
904                 objects[ob['uuid']] = ob
905
906             roots = []
907             root_owners = {}
908             for ob in all_projects:
909                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
910                     roots.append(ob)
911                     root_owners[ob['owner_uuid']] = True
912
913             lusers = arvados.util.list_all(
914                 self.api.users().list, self.num_retries,
915                 filters=[['uuid','in', list(root_owners)]])
916             lgroups = arvados.util.list_all(
917                 self.api.groups().list, self.num_retries,
918                 filters=[['uuid','in', list(root_owners)]])
919
920             users = {}
921             groups = {}
922
923             for l in lusers:
924                 objects[l["uuid"]] = l
925             for l in lgroups:
926                 objects[l["uuid"]] = l
927
928             contents = {}
929             for r in root_owners:
930                 if r in objects:
931                     obr = objects[r]
932                     if obr.get("name"):
933                         contents[obr["name"]] = obr
934                     #elif obr.get("username"):
935                     #    contents[obr["username"]] = obr
936                     elif "first_name" in obr:
937                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
938
939
940             for r in roots:
941                 if r['owner_uuid'] not in objects:
942                     contents[r['name']] = r
943
944         # end with llfuse.lock_released, re-acquire lock
945
946         try:
947             self.merge(contents.items(),
948                        lambda i: i[0],
949                        lambda a, i: a.uuid() == i[1]['uuid'],
950                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
951         except Exception:
952             _logger.exception()
953
954     def want_event_subscribe(self):
955         return True