Merge branch '10722-cwl-subworkflow' closes #10722
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8 import threading
9 from apiclient import errors as apiclient_errors
10 import errno
11 import time
12
13 from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
14 from fresh import FreshBase, convertTime, use_counter, check_update
15
16 import arvados.collection
17 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
18
19 _logger = logging.getLogger('arvados.arvados_fuse')
20
21
22 # Match any character which FUSE or Linux cannot accommodate as part
23 # of a filename. (If present in a collection filename, they will
24 # appear as underscores in the fuse mount.)
25 _disallowed_filename_characters = re.compile('[\x00/]')
26
27 # '.' and '..' are not reachable if API server is newer than #6277
28 def sanitize_filename(dirty):
29     """Replace disallowed filename characters with harmless "_"."""
30     if dirty is None:
31         return None
32     elif dirty == '':
33         return '_'
34     elif dirty == '.':
35         return '_'
36     elif dirty == '..':
37         return '__'
38     else:
39         return _disallowed_filename_characters.sub('_', dirty)
40
41
42 class Directory(FreshBase):
43     """Generic directory object, backed by a dict.
44
45     Consists of a set of entries with the key representing the filename
46     and the value referencing a File or Directory object.
47     """
48
49     def __init__(self, parent_inode, inodes):
50         """parent_inode is the integer inode number"""
51
52         super(Directory, self).__init__()
53
54         self.inode = None
55         if not isinstance(parent_inode, int):
56             raise Exception("parent_inode should be an int")
57         self.parent_inode = parent_inode
58         self.inodes = inodes
59         self._entries = {}
60         self._mtime = time.time()
61
62     #  Overriden by subclasses to implement logic to update the entries dict
63     #  when the directory is stale
64     @use_counter
65     def update(self):
66         pass
67
68     # Only used when computing the size of the disk footprint of the directory
69     # (stub)
70     def size(self):
71         return 0
72
73     def persisted(self):
74         return False
75
76     def checkupdate(self):
77         if self.stale():
78             try:
79                 self.update()
80             except apiclient.errors.HttpError as e:
81                 _logger.warn(e)
82
83     @use_counter
84     @check_update
85     def __getitem__(self, item):
86         return self._entries[item]
87
88     @use_counter
89     @check_update
90     def items(self):
91         return list(self._entries.items())
92
93     @use_counter
94     @check_update
95     def __contains__(self, k):
96         return k in self._entries
97
98     @use_counter
99     @check_update
100     def __len__(self):
101         return len(self._entries)
102
103     def fresh(self):
104         self.inodes.touch(self)
105         super(Directory, self).fresh()
106
107     def merge(self, items, fn, same, new_entry):
108         """Helper method for updating the contents of the directory.
109
110         Takes a list describing the new contents of the directory, reuse
111         entries that are the same in both the old and new lists, create new
112         entries, and delete old entries missing from the new list.
113
114         :items: iterable with new directory contents
115
116         :fn: function to take an entry in 'items' and return the desired file or
117         directory name, or None if this entry should be skipped
118
119         :same: function to compare an existing entry (a File or Directory
120         object) with an entry in the items list to determine whether to keep
121         the existing entry.
122
123         :new_entry: function to create a new directory entry (File or Directory
124         object) from an entry in the items list.
125
126         """
127
128         oldentries = self._entries
129         self._entries = {}
130         changed = False
131         for i in items:
132             name = sanitize_filename(fn(i))
133             if name:
134                 if name in oldentries and same(oldentries[name], i):
135                     # move existing directory entry over
136                     self._entries[name] = oldentries[name]
137                     del oldentries[name]
138                 else:
139                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
140                     # create new directory entry
141                     ent = new_entry(i)
142                     if ent is not None:
143                         self._entries[name] = self.inodes.add_entry(ent)
144                         changed = True
145
146         # delete any other directory entries that were not in found in 'items'
147         for i in oldentries:
148             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
149             self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
150             self.inodes.del_entry(oldentries[i])
151             changed = True
152
153         if changed:
154             self.inodes.invalidate_inode(self.inode)
155             self._mtime = time.time()
156
157         self.fresh()
158
159     def in_use(self):
160         if super(Directory, self).in_use():
161             return True
162         for v in self._entries.itervalues():
163             if v.in_use():
164                 return True
165         return False
166
167     def has_ref(self, only_children):
168         if super(Directory, self).has_ref(only_children):
169             return True
170         for v in self._entries.itervalues():
171             if v.has_ref(False):
172                 return True
173         return False
174
175     def clear(self):
176         """Delete all entries"""
177         oldentries = self._entries
178         self._entries = {}
179         for n in oldentries:
180             oldentries[n].clear()
181             self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
182             self.inodes.del_entry(oldentries[n])
183         self.inodes.invalidate_inode(self.inode)
184         self.invalidate()
185
186     def kernel_invalidate(self):
187         for n, e in self._entries.iteritems():
188             self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
189             e.kernel_invalidate()
190         self.inodes.invalidate_inode(self.inode)
191
192     def mtime(self):
193         return self._mtime
194
195     def writable(self):
196         return False
197
198     def flush(self):
199         pass
200
201     def want_event_subscribe(self):
202         raise NotImplementedError()
203
204     def create(self, name):
205         raise NotImplementedError()
206
207     def mkdir(self, name):
208         raise NotImplementedError()
209
210     def unlink(self, name):
211         raise NotImplementedError()
212
213     def rmdir(self, name):
214         raise NotImplementedError()
215
216     def rename(self, name_old, name_new, src):
217         raise NotImplementedError()
218
219
220 class CollectionDirectoryBase(Directory):
221     """Represent an Arvados Collection as a directory.
222
223     This class is used for Subcollections, and is also the base class for
224     CollectionDirectory, which implements collection loading/saving on
225     Collection records.
226
227     Most operations act only the underlying Arvados `Collection` object.  The
228     `Collection` object signals via a notify callback to
229     `CollectionDirectoryBase.on_event` that an item was added, removed or
230     modified.  FUSE inodes and directory entries are created, deleted or
231     invalidated in response to these events.
232
233     """
234
235     def __init__(self, parent_inode, inodes, collection):
236         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
237         self.collection = collection
238
239     def new_entry(self, name, item, mtime):
240         name = sanitize_filename(name)
241         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
242             if item.fuse_entry.dead is not True:
243                 raise Exception("Can only reparent dead inode entry")
244             if item.fuse_entry.inode is None:
245                 raise Exception("Reparented entry must still have valid inode")
246             item.fuse_entry.dead = False
247             self._entries[name] = item.fuse_entry
248         elif isinstance(item, arvados.collection.RichCollectionBase):
249             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
250             self._entries[name].populate(mtime)
251         else:
252             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
253         item.fuse_entry = self._entries[name]
254
255     def on_event(self, event, collection, name, item):
256         if collection == self.collection:
257             name = sanitize_filename(name)
258             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
259             with llfuse.lock:
260                 if event == arvados.collection.ADD:
261                     self.new_entry(name, item, self.mtime())
262                 elif event == arvados.collection.DEL:
263                     ent = self._entries[name]
264                     del self._entries[name]
265                     self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
266                     self.inodes.del_entry(ent)
267                 elif event == arvados.collection.MOD:
268                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
269                         self.inodes.invalidate_inode(item.fuse_entry.inode)
270                     elif name in self._entries:
271                         self.inodes.invalidate_inode(self._entries[name].inode)
272
273     def populate(self, mtime):
274         self._mtime = mtime
275         self.collection.subscribe(self.on_event)
276         for entry, item in self.collection.items():
277             self.new_entry(entry, item, self.mtime())
278
279     def writable(self):
280         return self.collection.writable()
281
282     @use_counter
283     def flush(self):
284         with llfuse.lock_released:
285             self.collection.root_collection().save()
286
287     @use_counter
288     @check_update
289     def create(self, name):
290         with llfuse.lock_released:
291             self.collection.open(name, "w").close()
292
293     @use_counter
294     @check_update
295     def mkdir(self, name):
296         with llfuse.lock_released:
297             self.collection.mkdirs(name)
298
299     @use_counter
300     @check_update
301     def unlink(self, name):
302         with llfuse.lock_released:
303             self.collection.remove(name)
304         self.flush()
305
306     @use_counter
307     @check_update
308     def rmdir(self, name):
309         with llfuse.lock_released:
310             self.collection.remove(name)
311         self.flush()
312
313     @use_counter
314     @check_update
315     def rename(self, name_old, name_new, src):
316         if not isinstance(src, CollectionDirectoryBase):
317             raise llfuse.FUSEError(errno.EPERM)
318
319         if name_new in self:
320             ent = src[name_old]
321             tgt = self[name_new]
322             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
323                 pass
324             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
325                 if len(tgt) > 0:
326                     raise llfuse.FUSEError(errno.ENOTEMPTY)
327             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
328                 raise llfuse.FUSEError(errno.ENOTDIR)
329             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
330                 raise llfuse.FUSEError(errno.EISDIR)
331
332         with llfuse.lock_released:
333             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
334         self.flush()
335         src.flush()
336
337     def clear(self):
338         super(CollectionDirectoryBase, self).clear()
339         self.collection = None
340
341
342 class CollectionDirectory(CollectionDirectoryBase):
343     """Represents the root of a directory tree representing a collection."""
344
345     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
346         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
347         self.api = api
348         self.num_retries = num_retries
349         self.collection_record_file = None
350         self.collection_record = None
351         self._poll = True
352         try:
353             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2)/2)
354         except:
355             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
356             self._poll_time = 60*60
357
358         if isinstance(collection_record, dict):
359             self.collection_locator = collection_record['uuid']
360             self._mtime = convertTime(collection_record.get('modified_at'))
361         else:
362             self.collection_locator = collection_record
363             self._mtime = 0
364         self._manifest_size = 0
365         if self.collection_locator:
366             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
367         self._updating_lock = threading.Lock()
368
369     def same(self, i):
370         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
371
372     def writable(self):
373         return self.collection.writable() if self.collection is not None else self._writable
374
375     def want_event_subscribe(self):
376         return (uuid_pattern.match(self.collection_locator) is not None)
377
378     # Used by arv-web.py to switch the contents of the CollectionDirectory
379     def change_collection(self, new_locator):
380         """Switch the contents of the CollectionDirectory.
381
382         Must be called with llfuse.lock held.
383         """
384
385         self.collection_locator = new_locator
386         self.collection_record = None
387         self.update()
388
389     def new_collection(self, new_collection_record, coll_reader):
390         if self.inode:
391             self.clear()
392
393         self.collection_record = new_collection_record
394
395         if self.collection_record:
396             self._mtime = convertTime(self.collection_record.get('modified_at'))
397             self.collection_locator = self.collection_record["uuid"]
398             if self.collection_record_file is not None:
399                 self.collection_record_file.update(self.collection_record)
400
401         self.collection = coll_reader
402         self.populate(self.mtime())
403
404     def uuid(self):
405         return self.collection_locator
406
407     @use_counter
408     def update(self, to_record_version=None):
409         try:
410             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
411                 return True
412
413             if self.collection_locator is None:
414                 self.fresh()
415                 return True
416
417             try:
418                 with llfuse.lock_released:
419                     self._updating_lock.acquire()
420                     if not self.stale():
421                         return
422
423                     _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
424                     if self.collection is not None:
425                         if self.collection.known_past_version(to_record_version):
426                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
427                         else:
428                             self.collection.update()
429                     else:
430                         if uuid_pattern.match(self.collection_locator):
431                             coll_reader = arvados.collection.Collection(
432                                 self.collection_locator, self.api, self.api.keep,
433                                 num_retries=self.num_retries)
434                         else:
435                             coll_reader = arvados.collection.CollectionReader(
436                                 self.collection_locator, self.api, self.api.keep,
437                                 num_retries=self.num_retries)
438                         new_collection_record = coll_reader.api_response() or {}
439                         # If the Collection only exists in Keep, there will be no API
440                         # response.  Fill in the fields we need.
441                         if 'uuid' not in new_collection_record:
442                             new_collection_record['uuid'] = self.collection_locator
443                         if "portable_data_hash" not in new_collection_record:
444                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
445                         if 'manifest_text' not in new_collection_record:
446                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
447
448                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
449                             self.new_collection(new_collection_record, coll_reader)
450
451                         self._manifest_size = len(coll_reader.manifest_text())
452                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
453                 # end with llfuse.lock_released, re-acquire lock
454
455                 self.fresh()
456                 return True
457             finally:
458                 self._updating_lock.release()
459         except arvados.errors.NotFoundError as e:
460             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
461         except arvados.errors.ArgumentError as detail:
462             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
463             if self.collection_record is not None and "manifest_text" in self.collection_record:
464                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
465         except Exception:
466             _logger.exception("arv-mount %s: error", self.collection_locator)
467             if self.collection_record is not None and "manifest_text" in self.collection_record:
468                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
469         self.invalidate()
470         return False
471
472     @use_counter
473     @check_update
474     def __getitem__(self, item):
475         if item == '.arvados#collection':
476             if self.collection_record_file is None:
477                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
478                 self.inodes.add_entry(self.collection_record_file)
479             return self.collection_record_file
480         else:
481             return super(CollectionDirectory, self).__getitem__(item)
482
483     def __contains__(self, k):
484         if k == '.arvados#collection':
485             return True
486         else:
487             return super(CollectionDirectory, self).__contains__(k)
488
489     def invalidate(self):
490         self.collection_record = None
491         self.collection_record_file = None
492         super(CollectionDirectory, self).invalidate()
493
494     def persisted(self):
495         return (self.collection_locator is not None)
496
497     def objsize(self):
498         # This is an empirically-derived heuristic to estimate the memory used
499         # to store this collection's metadata.  Calculating the memory
500         # footprint directly would be more accurate, but also more complicated.
501         return self._manifest_size * 128
502
503     def finalize(self):
504         if self.collection is not None:
505             if self.writable():
506                 self.collection.save()
507             self.collection.stop_threads()
508
509     def clear(self):
510         super(CollectionDirectory, self).clear()
511         self._manifest_size = 0
512
513
514 class TmpCollectionDirectory(CollectionDirectoryBase):
515     """A directory backed by an Arvados collection that never gets saved.
516
517     This supports using Keep as scratch space. A userspace program can
518     read the .arvados#collection file to get a current manifest in
519     order to save a snapshot of the scratch data or use it as a crunch
520     job output.
521     """
522
523     class UnsaveableCollection(arvados.collection.Collection):
524         def save(self):
525             pass
526         def save_new(self):
527             pass
528
529     def __init__(self, parent_inode, inodes, api_client, num_retries):
530         collection = self.UnsaveableCollection(
531             api_client=api_client,
532             keep_client=api_client.keep,
533             num_retries=num_retries)
534         super(TmpCollectionDirectory, self).__init__(
535             parent_inode, inodes, collection)
536         self.collection_record_file = None
537         self.populate(self.mtime())
538
539     def on_event(self, *args, **kwargs):
540         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
541         if self.collection_record_file:
542             with llfuse.lock:
543                 self.collection_record_file.invalidate()
544             self.inodes.invalidate_inode(self.collection_record_file.inode)
545             _logger.debug("%s invalidated collection record", self)
546
547     def collection_record(self):
548         with llfuse.lock_released:
549             return {
550                 "uuid": None,
551                 "manifest_text": self.collection.manifest_text(),
552                 "portable_data_hash": self.collection.portable_data_hash(),
553             }
554
555     def __contains__(self, k):
556         return (k == '.arvados#collection' or
557                 super(TmpCollectionDirectory, self).__contains__(k))
558
559     @use_counter
560     def __getitem__(self, item):
561         if item == '.arvados#collection':
562             if self.collection_record_file is None:
563                 self.collection_record_file = FuncToJSONFile(
564                     self.inode, self.collection_record)
565                 self.inodes.add_entry(self.collection_record_file)
566             return self.collection_record_file
567         return super(TmpCollectionDirectory, self).__getitem__(item)
568
569     def persisted(self):
570         return False
571
572     def writable(self):
573         return True
574
575     def want_event_subscribe(self):
576         return False
577
578     def finalize(self):
579         self.collection.stop_threads()
580
581     def invalidate(self):
582         if self.collection_record_file:
583             self.collection_record_file.invalidate()
584         super(TmpCollectionDirectory, self).invalidate()
585
586
587 class MagicDirectory(Directory):
588     """A special directory that logically contains the set of all extant keep locators.
589
590     When a file is referenced by lookup(), it is tested to see if it is a valid
591     keep locator to a manifest, and if so, loads the manifest contents as a
592     subdirectory of this directory with the locator as the directory name.
593     Since querying a list of all extant keep locators is impractical, only
594     collections that have already been accessed are visible to readdir().
595
596     """
597
598     README_TEXT = """
599 This directory provides access to Arvados collections as subdirectories listed
600 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
601 the form '1234567890abcdef0123456789abcdef+123').
602
603 Note that this directory will appear empty until you attempt to access a
604 specific collection subdirectory (such as trying to 'cd' into it), at which
605 point the collection will actually be looked up on the server and the directory
606 will appear if it exists.
607
608 """.lstrip()
609
610     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
611         super(MagicDirectory, self).__init__(parent_inode, inodes)
612         self.api = api
613         self.num_retries = num_retries
614         self.pdh_only = pdh_only
615
616     def __setattr__(self, name, value):
617         super(MagicDirectory, self).__setattr__(name, value)
618         # When we're assigned an inode, add a README.
619         if ((name == 'inode') and (self.inode is not None) and
620               (not self._entries)):
621             self._entries['README'] = self.inodes.add_entry(
622                 StringFile(self.inode, self.README_TEXT, time.time()))
623             # If we're the root directory, add an identical by_id subdirectory.
624             if self.inode == llfuse.ROOT_INODE:
625                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
626                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
627
628     def __contains__(self, k):
629         if k in self._entries:
630             return True
631
632         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
633             return False
634
635         try:
636             e = self.inodes.add_entry(CollectionDirectory(
637                     self.inode, self.inodes, self.api, self.num_retries, k))
638
639             if e.update():
640                 if k not in self._entries:
641                     self._entries[k] = e
642                 else:
643                     self.inodes.del_entry(e)
644                 return True
645             else:
646                 self.inodes.invalidate_entry(self.inode, k)
647                 self.inodes.del_entry(e)
648                 return False
649         except Exception as ex:
650             _logger.debug('arv-mount exception keep %s', ex)
651             self.inodes.del_entry(e)
652             return False
653
654     def __getitem__(self, item):
655         if item in self:
656             return self._entries[item]
657         else:
658             raise KeyError("No collection with id " + item)
659
660     def clear(self):
661         pass
662
663     def want_event_subscribe(self):
664         return not self.pdh_only
665
666
667 class TagsDirectory(Directory):
668     """A special directory that contains as subdirectories all tags visible to the user."""
669
670     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
671         super(TagsDirectory, self).__init__(parent_inode, inodes)
672         self.api = api
673         self.num_retries = num_retries
674         self._poll = True
675         self._poll_time = poll_time
676
677     def want_event_subscribe(self):
678         return True
679
680     @use_counter
681     def update(self):
682         with llfuse.lock_released:
683             tags = self.api.links().list(
684                 filters=[['link_class', '=', 'tag']],
685                 select=['name'], distinct=True
686                 ).execute(num_retries=self.num_retries)
687         if "items" in tags:
688             self.merge(tags['items'],
689                        lambda i: i['name'],
690                        lambda a, i: a.tag == i['name'],
691                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
692
693
694 class TagDirectory(Directory):
695     """A special directory that contains as subdirectories all collections visible
696     to the user that are tagged with a particular tag.
697     """
698
699     def __init__(self, parent_inode, inodes, api, num_retries, tag,
700                  poll=False, poll_time=60):
701         super(TagDirectory, self).__init__(parent_inode, inodes)
702         self.api = api
703         self.num_retries = num_retries
704         self.tag = tag
705         self._poll = poll
706         self._poll_time = poll_time
707
708     def want_event_subscribe(self):
709         return True
710
711     @use_counter
712     def update(self):
713         with llfuse.lock_released:
714             taggedcollections = self.api.links().list(
715                 filters=[['link_class', '=', 'tag'],
716                          ['name', '=', self.tag],
717                          ['head_uuid', 'is_a', 'arvados#collection']],
718                 select=['head_uuid']
719                 ).execute(num_retries=self.num_retries)
720         self.merge(taggedcollections['items'],
721                    lambda i: i['head_uuid'],
722                    lambda a, i: a.collection_locator == i['head_uuid'],
723                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
724
725
726 class ProjectDirectory(Directory):
727     """A special directory that contains the contents of a project."""
728
729     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
730                  poll=False, poll_time=60):
731         super(ProjectDirectory, self).__init__(parent_inode, inodes)
732         self.api = api
733         self.num_retries = num_retries
734         self.project_object = project_object
735         self.project_object_file = None
736         self.project_uuid = project_object['uuid']
737         self._poll = poll
738         self._poll_time = poll_time
739         self._updating_lock = threading.Lock()
740         self._current_user = None
741
742     def want_event_subscribe(self):
743         return True
744
745     def createDirectory(self, i):
746         if collection_uuid_pattern.match(i['uuid']):
747             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
748         elif group_uuid_pattern.match(i['uuid']):
749             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
750         elif link_uuid_pattern.match(i['uuid']):
751             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
752                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
753             else:
754                 return None
755         elif uuid_pattern.match(i['uuid']):
756             return ObjectFile(self.parent_inode, i)
757         else:
758             return None
759
760     def uuid(self):
761         return self.project_uuid
762
763     @use_counter
764     def update(self):
765         if self.project_object_file == None:
766             self.project_object_file = ObjectFile(self.inode, self.project_object)
767             self.inodes.add_entry(self.project_object_file)
768
769         def namefn(i):
770             if 'name' in i:
771                 if i['name'] is None or len(i['name']) == 0:
772                     return None
773                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
774                     # collection or subproject
775                     return i['name']
776                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
777                     # name link
778                     return i['name']
779                 elif 'kind' in i and i['kind'].startswith('arvados#'):
780                     # something else
781                     return "{}.{}".format(i['name'], i['kind'][8:])
782             else:
783                 return None
784
785         def samefn(a, i):
786             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
787                 return a.uuid() == i['uuid']
788             elif isinstance(a, ObjectFile):
789                 return a.uuid() == i['uuid'] and not a.stale()
790             return False
791
792         try:
793             with llfuse.lock_released:
794                 self._updating_lock.acquire()
795                 if not self.stale():
796                     return
797
798                 if group_uuid_pattern.match(self.project_uuid):
799                     self.project_object = self.api.groups().get(
800                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
801                 elif user_uuid_pattern.match(self.project_uuid):
802                     self.project_object = self.api.users().get(
803                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
804
805                 contents = arvados.util.list_all(self.api.groups().contents,
806                                                  self.num_retries, uuid=self.project_uuid)
807
808             # end with llfuse.lock_released, re-acquire lock
809
810             self.merge(contents,
811                        namefn,
812                        samefn,
813                        self.createDirectory)
814         finally:
815             self._updating_lock.release()
816
817     @use_counter
818     @check_update
819     def __getitem__(self, item):
820         if item == '.arvados#project':
821             return self.project_object_file
822         else:
823             return super(ProjectDirectory, self).__getitem__(item)
824
825     def __contains__(self, k):
826         if k == '.arvados#project':
827             return True
828         else:
829             return super(ProjectDirectory, self).__contains__(k)
830
831     @use_counter
832     @check_update
833     def writable(self):
834         with llfuse.lock_released:
835             if not self._current_user:
836                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
837             return self._current_user["uuid"] in self.project_object["writable_by"]
838
839     def persisted(self):
840         return True
841
842     @use_counter
843     @check_update
844     def mkdir(self, name):
845         try:
846             with llfuse.lock_released:
847                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
848                                                     "name": name,
849                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
850             self.invalidate()
851         except apiclient_errors.Error as error:
852             _logger.error(error)
853             raise llfuse.FUSEError(errno.EEXIST)
854
855     @use_counter
856     @check_update
857     def rmdir(self, name):
858         if name not in self:
859             raise llfuse.FUSEError(errno.ENOENT)
860         if not isinstance(self[name], CollectionDirectory):
861             raise llfuse.FUSEError(errno.EPERM)
862         if len(self[name]) > 0:
863             raise llfuse.FUSEError(errno.ENOTEMPTY)
864         with llfuse.lock_released:
865             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
866         self.invalidate()
867
868     @use_counter
869     @check_update
870     def rename(self, name_old, name_new, src):
871         if not isinstance(src, ProjectDirectory):
872             raise llfuse.FUSEError(errno.EPERM)
873
874         ent = src[name_old]
875
876         if not isinstance(ent, CollectionDirectory):
877             raise llfuse.FUSEError(errno.EPERM)
878
879         if name_new in self:
880             # POSIX semantics for replacing one directory with another is
881             # tricky (the target directory must be empty, the operation must be
882             # atomic which isn't possible with the Arvados API as of this
883             # writing) so don't support that.
884             raise llfuse.FUSEError(errno.EPERM)
885
886         self.api.collections().update(uuid=ent.uuid(),
887                                       body={"owner_uuid": self.uuid(),
888                                             "name": name_new}).execute(num_retries=self.num_retries)
889
890         # Acually move the entry from source directory to this directory.
891         del src._entries[name_old]
892         self._entries[name_new] = ent
893         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
894
895
896 class SharedDirectory(Directory):
897     """A special directory that represents users or groups who have shared projects with me."""
898
899     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
900                  poll=False, poll_time=60):
901         super(SharedDirectory, self).__init__(parent_inode, inodes)
902         self.api = api
903         self.num_retries = num_retries
904         self.current_user = api.users().current().execute(num_retries=num_retries)
905         self._poll = True
906         self._poll_time = poll_time
907
908     @use_counter
909     def update(self):
910         with llfuse.lock_released:
911             all_projects = arvados.util.list_all(
912                 self.api.groups().list, self.num_retries,
913                 filters=[['group_class','=','project']])
914             objects = {}
915             for ob in all_projects:
916                 objects[ob['uuid']] = ob
917
918             roots = []
919             root_owners = {}
920             for ob in all_projects:
921                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
922                     roots.append(ob)
923                     root_owners[ob['owner_uuid']] = True
924
925             lusers = arvados.util.list_all(
926                 self.api.users().list, self.num_retries,
927                 filters=[['uuid','in', list(root_owners)]])
928             lgroups = arvados.util.list_all(
929                 self.api.groups().list, self.num_retries,
930                 filters=[['uuid','in', list(root_owners)]])
931
932             users = {}
933             groups = {}
934
935             for l in lusers:
936                 objects[l["uuid"]] = l
937             for l in lgroups:
938                 objects[l["uuid"]] = l
939
940             contents = {}
941             for r in root_owners:
942                 if r in objects:
943                     obr = objects[r]
944                     if obr.get("name"):
945                         contents[obr["name"]] = obr
946                     #elif obr.get("username"):
947                     #    contents[obr["username"]] = obr
948                     elif "first_name" in obr:
949                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
950
951
952             for r in roots:
953                 if r['owner_uuid'] not in objects:
954                     contents[r['name']] = r
955
956         # end with llfuse.lock_released, re-acquire lock
957
958         try:
959             self.merge(contents.items(),
960                        lambda i: i[0],
961                        lambda a, i: a.uuid() == i[1]['uuid'],
962                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
963         except Exception:
964             _logger.exception()
965
966     def want_event_subscribe(self):
967         return True