9463: Using finally: on except: clause to stop the updater thread on error or finishe...
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8 import threading
9 from apiclient import errors as apiclient_errors
10 import errno
11 import time
12
13 from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
14 from fresh import FreshBase, convertTime, use_counter, check_update
15
16 import arvados.collection
17 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
18
19 _logger = logging.getLogger('arvados.arvados_fuse')
20
21
22 # Match any character which FUSE or Linux cannot accommodate as part
23 # of a filename. (If present in a collection filename, they will
24 # appear as underscores in the fuse mount.)
25 _disallowed_filename_characters = re.compile('[\x00/]')
26
27 # '.' and '..' are not reachable if API server is newer than #6277
28 def sanitize_filename(dirty):
29     """Replace disallowed filename characters with harmless "_"."""
30     if dirty is None:
31         return None
32     elif dirty == '':
33         return '_'
34     elif dirty == '.':
35         return '_'
36     elif dirty == '..':
37         return '__'
38     else:
39         return _disallowed_filename_characters.sub('_', dirty)
40
41
42 class Directory(FreshBase):
43     """Generic directory object, backed by a dict.
44
45     Consists of a set of entries with the key representing the filename
46     and the value referencing a File or Directory object.
47     """
48
49     def __init__(self, parent_inode, inodes):
50         """parent_inode is the integer inode number"""
51
52         super(Directory, self).__init__()
53
54         self.inode = None
55         if not isinstance(parent_inode, int):
56             raise Exception("parent_inode should be an int")
57         self.parent_inode = parent_inode
58         self.inodes = inodes
59         self._entries = {}
60         self._mtime = time.time()
61
62     #  Overriden by subclasses to implement logic to update the entries dict
63     #  when the directory is stale
64     @use_counter
65     def update(self):
66         pass
67
68     # Only used when computing the size of the disk footprint of the directory
69     # (stub)
70     def size(self):
71         return 0
72
73     def persisted(self):
74         return False
75
76     def checkupdate(self):
77         if self.stale():
78             try:
79                 self.update()
80             except apiclient.errors.HttpError as e:
81                 _logger.warn(e)
82
83     @use_counter
84     @check_update
85     def __getitem__(self, item):
86         return self._entries[item]
87
88     @use_counter
89     @check_update
90     def items(self):
91         return list(self._entries.items())
92
93     @use_counter
94     @check_update
95     def __contains__(self, k):
96         return k in self._entries
97
98     @use_counter
99     @check_update
100     def __len__(self):
101         return len(self._entries)
102
103     def fresh(self):
104         self.inodes.touch(self)
105         super(Directory, self).fresh()
106
107     def merge(self, items, fn, same, new_entry):
108         """Helper method for updating the contents of the directory.
109
110         Takes a list describing the new contents of the directory, reuse
111         entries that are the same in both the old and new lists, create new
112         entries, and delete old entries missing from the new list.
113
114         :items: iterable with new directory contents
115
116         :fn: function to take an entry in 'items' and return the desired file or
117         directory name, or None if this entry should be skipped
118
119         :same: function to compare an existing entry (a File or Directory
120         object) with an entry in the items list to determine whether to keep
121         the existing entry.
122
123         :new_entry: function to create a new directory entry (File or Directory
124         object) from an entry in the items list.
125
126         """
127
128         oldentries = self._entries
129         self._entries = {}
130         changed = False
131         for i in items:
132             name = sanitize_filename(fn(i))
133             if name:
134                 if name in oldentries and same(oldentries[name], i):
135                     # move existing directory entry over
136                     self._entries[name] = oldentries[name]
137                     del oldentries[name]
138                 else:
139                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
140                     # create new directory entry
141                     ent = new_entry(i)
142                     if ent is not None:
143                         self._entries[name] = self.inodes.add_entry(ent)
144                         changed = True
145
146         # delete any other directory entries that were not in found in 'items'
147         for i in oldentries:
148             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
149             self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
150             self.inodes.del_entry(oldentries[i])
151             changed = True
152
153         if changed:
154             self.inodes.invalidate_inode(self.inode)
155             self._mtime = time.time()
156
157         self.fresh()
158
159     def clear(self, force=False):
160         """Delete all entries"""
161
162         if not self.in_use() or force:
163             oldentries = self._entries
164             self._entries = {}
165             for n in oldentries:
166                 if not oldentries[n].clear(force):
167                     self._entries = oldentries
168                     return False
169             for n in oldentries:
170                 self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
171                 self.inodes.del_entry(oldentries[n])
172             self.inodes.invalidate_inode(self.inode)
173             self.invalidate()
174             return True
175         else:
176             return False
177
178     def mtime(self):
179         return self._mtime
180
181     def writable(self):
182         return False
183
184     def flush(self):
185         pass
186
187     def want_event_subscribe(self):
188         raise NotImplementedError()
189
190     def create(self, name):
191         raise NotImplementedError()
192
193     def mkdir(self, name):
194         raise NotImplementedError()
195
196     def unlink(self, name):
197         raise NotImplementedError()
198
199     def rmdir(self, name):
200         raise NotImplementedError()
201
202     def rename(self, name_old, name_new, src):
203         raise NotImplementedError()
204
205
206 class CollectionDirectoryBase(Directory):
207     """Represent an Arvados Collection as a directory.
208
209     This class is used for Subcollections, and is also the base class for
210     CollectionDirectory, which implements collection loading/saving on
211     Collection records.
212
213     Most operations act only the underlying Arvados `Collection` object.  The
214     `Collection` object signals via a notify callback to
215     `CollectionDirectoryBase.on_event` that an item was added, removed or
216     modified.  FUSE inodes and directory entries are created, deleted or
217     invalidated in response to these events.
218
219     """
220
221     def __init__(self, parent_inode, inodes, collection):
222         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
223         self.collection = collection
224
225     def new_entry(self, name, item, mtime):
226         name = sanitize_filename(name)
227         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
228             if item.fuse_entry.dead is not True:
229                 raise Exception("Can only reparent dead inode entry")
230             if item.fuse_entry.inode is None:
231                 raise Exception("Reparented entry must still have valid inode")
232             item.fuse_entry.dead = False
233             self._entries[name] = item.fuse_entry
234         elif isinstance(item, arvados.collection.RichCollectionBase):
235             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
236             self._entries[name].populate(mtime)
237         else:
238             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
239         item.fuse_entry = self._entries[name]
240
241     def on_event(self, event, collection, name, item):
242         if collection == self.collection:
243             name = sanitize_filename(name)
244             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
245             with llfuse.lock:
246                 if event == arvados.collection.ADD:
247                     self.new_entry(name, item, self.mtime())
248                 elif event == arvados.collection.DEL:
249                     ent = self._entries[name]
250                     del self._entries[name]
251                     self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
252                     self.inodes.del_entry(ent)
253                 elif event == arvados.collection.MOD:
254                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
255                         self.inodes.invalidate_inode(item.fuse_entry.inode)
256                     elif name in self._entries:
257                         self.inodes.invalidate_inode(self._entries[name].inode)
258
259     def populate(self, mtime):
260         self._mtime = mtime
261         self.collection.subscribe(self.on_event)
262         for entry, item in self.collection.items():
263             self.new_entry(entry, item, self.mtime())
264
265     def writable(self):
266         return self.collection.writable()
267
268     @use_counter
269     def flush(self):
270         with llfuse.lock_released:
271             self.collection.root_collection().save()
272
273     @use_counter
274     @check_update
275     def create(self, name):
276         with llfuse.lock_released:
277             self.collection.open(name, "w").close()
278
279     @use_counter
280     @check_update
281     def mkdir(self, name):
282         with llfuse.lock_released:
283             self.collection.mkdirs(name)
284
285     @use_counter
286     @check_update
287     def unlink(self, name):
288         with llfuse.lock_released:
289             self.collection.remove(name)
290         self.flush()
291
292     @use_counter
293     @check_update
294     def rmdir(self, name):
295         with llfuse.lock_released:
296             self.collection.remove(name)
297         self.flush()
298
299     @use_counter
300     @check_update
301     def rename(self, name_old, name_new, src):
302         if not isinstance(src, CollectionDirectoryBase):
303             raise llfuse.FUSEError(errno.EPERM)
304
305         if name_new in self:
306             ent = src[name_old]
307             tgt = self[name_new]
308             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
309                 pass
310             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
311                 if len(tgt) > 0:
312                     raise llfuse.FUSEError(errno.ENOTEMPTY)
313             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
314                 raise llfuse.FUSEError(errno.ENOTDIR)
315             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
316                 raise llfuse.FUSEError(errno.EISDIR)
317
318         with llfuse.lock_released:
319             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
320         self.flush()
321         src.flush()
322
323     def clear(self, force=False):
324         r = super(CollectionDirectoryBase, self).clear(force)
325         self.collection = None
326         return r
327
328
329 class CollectionDirectory(CollectionDirectoryBase):
330     """Represents the root of a directory tree representing a collection."""
331
332     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
333         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
334         self.api = api
335         self.num_retries = num_retries
336         self.collection_record_file = None
337         self.collection_record = None
338         self._poll = True
339         try:
340             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2)/2)
341         except:
342             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
343             self._poll_time = 60*60
344
345         if isinstance(collection_record, dict):
346             self.collection_locator = collection_record['uuid']
347             self._mtime = convertTime(collection_record.get('modified_at'))
348         else:
349             self.collection_locator = collection_record
350             self._mtime = 0
351         self._manifest_size = 0
352         if self.collection_locator:
353             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
354         self._updating_lock = threading.Lock()
355
356     def same(self, i):
357         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
358
359     def writable(self):
360         return self.collection.writable() if self.collection is not None else self._writable
361
362     def want_event_subscribe(self):
363         return (uuid_pattern.match(self.collection_locator) is not None)
364
365     # Used by arv-web.py to switch the contents of the CollectionDirectory
366     def change_collection(self, new_locator):
367         """Switch the contents of the CollectionDirectory.
368
369         Must be called with llfuse.lock held.
370         """
371
372         self.collection_locator = new_locator
373         self.collection_record = None
374         self.update()
375
376     def new_collection(self, new_collection_record, coll_reader):
377         if self.inode:
378             self.clear(force=True)
379
380         self.collection_record = new_collection_record
381
382         if self.collection_record:
383             self._mtime = convertTime(self.collection_record.get('modified_at'))
384             self.collection_locator = self.collection_record["uuid"]
385             if self.collection_record_file is not None:
386                 self.collection_record_file.update(self.collection_record)
387
388         self.collection = coll_reader
389         self.populate(self.mtime())
390
391     def uuid(self):
392         return self.collection_locator
393
394     @use_counter
395     def update(self, to_record_version=None):
396         try:
397             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
398                 return True
399
400             if self.collection_locator is None:
401                 self.fresh()
402                 return True
403
404             try:
405                 with llfuse.lock_released:
406                     self._updating_lock.acquire()
407                     if not self.stale():
408                         return
409
410                     _logger.debug("Updating %s", to_record_version)
411                     if self.collection is not None:
412                         if self.collection.known_past_version(to_record_version):
413                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
414                         else:
415                             self.collection.update()
416                     else:
417                         if uuid_pattern.match(self.collection_locator):
418                             coll_reader = arvados.collection.Collection(
419                                 self.collection_locator, self.api, self.api.keep,
420                                 num_retries=self.num_retries)
421                         else:
422                             coll_reader = arvados.collection.CollectionReader(
423                                 self.collection_locator, self.api, self.api.keep,
424                                 num_retries=self.num_retries)
425                         new_collection_record = coll_reader.api_response() or {}
426                         # If the Collection only exists in Keep, there will be no API
427                         # response.  Fill in the fields we need.
428                         if 'uuid' not in new_collection_record:
429                             new_collection_record['uuid'] = self.collection_locator
430                         if "portable_data_hash" not in new_collection_record:
431                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
432                         if 'manifest_text' not in new_collection_record:
433                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
434
435                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
436                             self.new_collection(new_collection_record, coll_reader)
437
438                         self._manifest_size = len(coll_reader.manifest_text())
439                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
440                 # end with llfuse.lock_released, re-acquire lock
441
442                 self.fresh()
443                 return True
444             finally:
445                 self._updating_lock.release()
446         except arvados.errors.NotFoundError as e:
447             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
448         except arvados.errors.ArgumentError as detail:
449             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
450             if self.collection_record is not None and "manifest_text" in self.collection_record:
451                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
452         except Exception:
453             _logger.exception("arv-mount %s: error", self.collection_locator)
454             if self.collection_record is not None and "manifest_text" in self.collection_record:
455                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
456         self.invalidate()
457         return False
458
459     @use_counter
460     @check_update
461     def __getitem__(self, item):
462         if item == '.arvados#collection':
463             if self.collection_record_file is None:
464                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
465                 self.inodes.add_entry(self.collection_record_file)
466             return self.collection_record_file
467         else:
468             return super(CollectionDirectory, self).__getitem__(item)
469
470     def __contains__(self, k):
471         if k == '.arvados#collection':
472             return True
473         else:
474             return super(CollectionDirectory, self).__contains__(k)
475
476     def invalidate(self):
477         self.collection_record = None
478         self.collection_record_file = None
479         super(CollectionDirectory, self).invalidate()
480
481     def persisted(self):
482         return (self.collection_locator is not None)
483
484     def objsize(self):
485         # This is an empirically-derived heuristic to estimate the memory used
486         # to store this collection's metadata.  Calculating the memory
487         # footprint directly would be more accurate, but also more complicated.
488         return self._manifest_size * 128
489
490     def finalize(self):
491         if self.collection is not None:
492             if self.writable():
493                 self.collection.save()
494             self.collection.stop_threads()
495
496
497 class TmpCollectionDirectory(CollectionDirectoryBase):
498     """A directory backed by an Arvados collection that never gets saved.
499
500     This supports using Keep as scratch space. A userspace program can
501     read the .arvados#collection file to get a current manifest in
502     order to save a snapshot of the scratch data or use it as a crunch
503     job output.
504     """
505
506     class UnsaveableCollection(arvados.collection.Collection):
507         def save(self):
508             pass
509         def save_new(self):
510             pass
511
512     def __init__(self, parent_inode, inodes, api_client, num_retries):
513         collection = self.UnsaveableCollection(
514             api_client=api_client,
515             keep_client=api_client.keep,
516             num_retries=num_retries)
517         super(TmpCollectionDirectory, self).__init__(
518             parent_inode, inodes, collection)
519         self.collection_record_file = None
520         self.populate(self.mtime())
521
522     def on_event(self, *args, **kwargs):
523         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
524         if self.collection_record_file:
525             with llfuse.lock:
526                 self.collection_record_file.invalidate()
527             self.inodes.invalidate_inode(self.collection_record_file.inode)
528             _logger.debug("%s invalidated collection record", self)
529
530     def collection_record(self):
531         with llfuse.lock_released:
532             return {
533                 "uuid": None,
534                 "manifest_text": self.collection.manifest_text(),
535                 "portable_data_hash": self.collection.portable_data_hash(),
536             }
537
538     def __contains__(self, k):
539         return (k == '.arvados#collection' or
540                 super(TmpCollectionDirectory, self).__contains__(k))
541
542     @use_counter
543     def __getitem__(self, item):
544         if item == '.arvados#collection':
545             if self.collection_record_file is None:
546                 self.collection_record_file = FuncToJSONFile(
547                     self.inode, self.collection_record)
548                 self.inodes.add_entry(self.collection_record_file)
549             return self.collection_record_file
550         return super(TmpCollectionDirectory, self).__getitem__(item)
551
552     def persisted(self):
553         return False
554
555     def writable(self):
556         return True
557
558     def want_event_subscribe(self):
559         return False
560
561     def finalize(self):
562         self.collection.stop_threads()
563
564     def invalidate(self):
565         if self.collection_record_file:
566             self.collection_record_file.invalidate()
567         super(TmpCollectionDirectory, self).invalidate()
568
569
570 class MagicDirectory(Directory):
571     """A special directory that logically contains the set of all extant keep locators.
572
573     When a file is referenced by lookup(), it is tested to see if it is a valid
574     keep locator to a manifest, and if so, loads the manifest contents as a
575     subdirectory of this directory with the locator as the directory name.
576     Since querying a list of all extant keep locators is impractical, only
577     collections that have already been accessed are visible to readdir().
578
579     """
580
581     README_TEXT = """
582 This directory provides access to Arvados collections as subdirectories listed
583 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
584 the form '1234567890abcdef0123456789abcdef+123').
585
586 Note that this directory will appear empty until you attempt to access a
587 specific collection subdirectory (such as trying to 'cd' into it), at which
588 point the collection will actually be looked up on the server and the directory
589 will appear if it exists.
590
591 """.lstrip()
592
593     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
594         super(MagicDirectory, self).__init__(parent_inode, inodes)
595         self.api = api
596         self.num_retries = num_retries
597         self.pdh_only = pdh_only
598
599     def __setattr__(self, name, value):
600         super(MagicDirectory, self).__setattr__(name, value)
601         # When we're assigned an inode, add a README.
602         if ((name == 'inode') and (self.inode is not None) and
603               (not self._entries)):
604             self._entries['README'] = self.inodes.add_entry(
605                 StringFile(self.inode, self.README_TEXT, time.time()))
606             # If we're the root directory, add an identical by_id subdirectory.
607             if self.inode == llfuse.ROOT_INODE:
608                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
609                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
610
611     def __contains__(self, k):
612         if k in self._entries:
613             return True
614
615         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
616             return False
617
618         try:
619             e = self.inodes.add_entry(CollectionDirectory(
620                     self.inode, self.inodes, self.api, self.num_retries, k))
621
622             if e.update():
623                 if k not in self._entries:
624                     self._entries[k] = e
625                 else:
626                     self.inodes.del_entry(e)
627                 return True
628             else:
629                 self.inodes.invalidate_entry(self.inode, k)
630                 self.inodes.del_entry(e)
631                 return False
632         except Exception as ex:
633             _logger.debug('arv-mount exception keep %s', ex)
634             self.inodes.del_entry(e)
635             return False
636
637     def __getitem__(self, item):
638         if item in self:
639             return self._entries[item]
640         else:
641             raise KeyError("No collection with id " + item)
642
643     def clear(self, force=False):
644         pass
645
646     def want_event_subscribe(self):
647         return not self.pdh_only
648
649
650 class RecursiveInvalidateDirectory(Directory):
651     def invalidate(self):
652         try:
653             super(RecursiveInvalidateDirectory, self).invalidate()
654             for a in self._entries:
655                 self._entries[a].invalidate()
656         except Exception:
657             _logger.exception()
658
659
660 class TagsDirectory(RecursiveInvalidateDirectory):
661     """A special directory that contains as subdirectories all tags visible to the user."""
662
663     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
664         super(TagsDirectory, self).__init__(parent_inode, inodes)
665         self.api = api
666         self.num_retries = num_retries
667         self._poll = True
668         self._poll_time = poll_time
669
670     def want_event_subscribe(self):
671         return True
672
673     @use_counter
674     def update(self):
675         with llfuse.lock_released:
676             tags = self.api.links().list(
677                 filters=[['link_class', '=', 'tag']],
678                 select=['name'], distinct=True
679                 ).execute(num_retries=self.num_retries)
680         if "items" in tags:
681             self.merge(tags['items'],
682                        lambda i: i['name'],
683                        lambda a, i: a.tag == i['name'],
684                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
685
686
687 class TagDirectory(Directory):
688     """A special directory that contains as subdirectories all collections visible
689     to the user that are tagged with a particular tag.
690     """
691
692     def __init__(self, parent_inode, inodes, api, num_retries, tag,
693                  poll=False, poll_time=60):
694         super(TagDirectory, self).__init__(parent_inode, inodes)
695         self.api = api
696         self.num_retries = num_retries
697         self.tag = tag
698         self._poll = poll
699         self._poll_time = poll_time
700
701     def want_event_subscribe(self):
702         return True
703
704     @use_counter
705     def update(self):
706         with llfuse.lock_released:
707             taggedcollections = self.api.links().list(
708                 filters=[['link_class', '=', 'tag'],
709                          ['name', '=', self.tag],
710                          ['head_uuid', 'is_a', 'arvados#collection']],
711                 select=['head_uuid']
712                 ).execute(num_retries=self.num_retries)
713         self.merge(taggedcollections['items'],
714                    lambda i: i['head_uuid'],
715                    lambda a, i: a.collection_locator == i['head_uuid'],
716                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
717
718
719 class ProjectDirectory(Directory):
720     """A special directory that contains the contents of a project."""
721
722     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
723                  poll=False, poll_time=60):
724         super(ProjectDirectory, self).__init__(parent_inode, inodes)
725         self.api = api
726         self.num_retries = num_retries
727         self.project_object = project_object
728         self.project_object_file = None
729         self.project_uuid = project_object['uuid']
730         self._poll = poll
731         self._poll_time = poll_time
732         self._updating_lock = threading.Lock()
733         self._current_user = None
734
735     def want_event_subscribe(self):
736         return True
737
738     def createDirectory(self, i):
739         if collection_uuid_pattern.match(i['uuid']):
740             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
741         elif group_uuid_pattern.match(i['uuid']):
742             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
743         elif link_uuid_pattern.match(i['uuid']):
744             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
745                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
746             else:
747                 return None
748         elif uuid_pattern.match(i['uuid']):
749             return ObjectFile(self.parent_inode, i)
750         else:
751             return None
752
753     def uuid(self):
754         return self.project_uuid
755
756     @use_counter
757     def update(self):
758         if self.project_object_file == None:
759             self.project_object_file = ObjectFile(self.inode, self.project_object)
760             self.inodes.add_entry(self.project_object_file)
761
762         def namefn(i):
763             if 'name' in i:
764                 if i['name'] is None or len(i['name']) == 0:
765                     return None
766                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
767                     # collection or subproject
768                     return i['name']
769                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
770                     # name link
771                     return i['name']
772                 elif 'kind' in i and i['kind'].startswith('arvados#'):
773                     # something else
774                     return "{}.{}".format(i['name'], i['kind'][8:])
775             else:
776                 return None
777
778         def samefn(a, i):
779             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
780                 return a.uuid() == i['uuid']
781             elif isinstance(a, ObjectFile):
782                 return a.uuid() == i['uuid'] and not a.stale()
783             return False
784
785         try:
786             with llfuse.lock_released:
787                 self._updating_lock.acquire()
788                 if not self.stale():
789                     return
790
791                 if group_uuid_pattern.match(self.project_uuid):
792                     self.project_object = self.api.groups().get(
793                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
794                 elif user_uuid_pattern.match(self.project_uuid):
795                     self.project_object = self.api.users().get(
796                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
797
798                 contents = arvados.util.list_all(self.api.groups().contents,
799                                                  self.num_retries, uuid=self.project_uuid)
800
801             # end with llfuse.lock_released, re-acquire lock
802
803             self.merge(contents,
804                        namefn,
805                        samefn,
806                        self.createDirectory)
807         finally:
808             self._updating_lock.release()
809
810     @use_counter
811     @check_update
812     def __getitem__(self, item):
813         if item == '.arvados#project':
814             return self.project_object_file
815         else:
816             return super(ProjectDirectory, self).__getitem__(item)
817
818     def __contains__(self, k):
819         if k == '.arvados#project':
820             return True
821         else:
822             return super(ProjectDirectory, self).__contains__(k)
823
824     @use_counter
825     @check_update
826     def writable(self):
827         with llfuse.lock_released:
828             if not self._current_user:
829                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
830             return self._current_user["uuid"] in self.project_object["writable_by"]
831
832     def persisted(self):
833         return True
834
835     @use_counter
836     @check_update
837     def mkdir(self, name):
838         try:
839             with llfuse.lock_released:
840                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
841                                                     "name": name,
842                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
843             self.invalidate()
844         except apiclient_errors.Error as error:
845             _logger.error(error)
846             raise llfuse.FUSEError(errno.EEXIST)
847
848     @use_counter
849     @check_update
850     def rmdir(self, name):
851         if name not in self:
852             raise llfuse.FUSEError(errno.ENOENT)
853         if not isinstance(self[name], CollectionDirectory):
854             raise llfuse.FUSEError(errno.EPERM)
855         if len(self[name]) > 0:
856             raise llfuse.FUSEError(errno.ENOTEMPTY)
857         with llfuse.lock_released:
858             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
859         self.invalidate()
860
861     @use_counter
862     @check_update
863     def rename(self, name_old, name_new, src):
864         if not isinstance(src, ProjectDirectory):
865             raise llfuse.FUSEError(errno.EPERM)
866
867         ent = src[name_old]
868
869         if not isinstance(ent, CollectionDirectory):
870             raise llfuse.FUSEError(errno.EPERM)
871
872         if name_new in self:
873             # POSIX semantics for replacing one directory with another is
874             # tricky (the target directory must be empty, the operation must be
875             # atomic which isn't possible with the Arvados API as of this
876             # writing) so don't support that.
877             raise llfuse.FUSEError(errno.EPERM)
878
879         self.api.collections().update(uuid=ent.uuid(),
880                                       body={"owner_uuid": self.uuid(),
881                                             "name": name_new}).execute(num_retries=self.num_retries)
882
883         # Acually move the entry from source directory to this directory.
884         del src._entries[name_old]
885         self._entries[name_new] = ent
886         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
887
888
889 class SharedDirectory(Directory):
890     """A special directory that represents users or groups who have shared projects with me."""
891
892     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
893                  poll=False, poll_time=60):
894         super(SharedDirectory, self).__init__(parent_inode, inodes)
895         self.api = api
896         self.num_retries = num_retries
897         self.current_user = api.users().current().execute(num_retries=num_retries)
898         self._poll = True
899         self._poll_time = poll_time
900
901     @use_counter
902     def update(self):
903         with llfuse.lock_released:
904             all_projects = arvados.util.list_all(
905                 self.api.groups().list, self.num_retries,
906                 filters=[['group_class','=','project']])
907             objects = {}
908             for ob in all_projects:
909                 objects[ob['uuid']] = ob
910
911             roots = []
912             root_owners = {}
913             for ob in all_projects:
914                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
915                     roots.append(ob)
916                     root_owners[ob['owner_uuid']] = True
917
918             lusers = arvados.util.list_all(
919                 self.api.users().list, self.num_retries,
920                 filters=[['uuid','in', list(root_owners)]])
921             lgroups = arvados.util.list_all(
922                 self.api.groups().list, self.num_retries,
923                 filters=[['uuid','in', list(root_owners)]])
924
925             users = {}
926             groups = {}
927
928             for l in lusers:
929                 objects[l["uuid"]] = l
930             for l in lgroups:
931                 objects[l["uuid"]] = l
932
933             contents = {}
934             for r in root_owners:
935                 if r in objects:
936                     obr = objects[r]
937                     if obr.get("name"):
938                         contents[obr["name"]] = obr
939                     #elif obr.get("username"):
940                     #    contents[obr["username"]] = obr
941                     elif "first_name" in obr:
942                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
943
944
945             for r in roots:
946                 if r['owner_uuid'] not in objects:
947                     contents[r['name']] = r
948
949         # end with llfuse.lock_released, re-acquire lock
950
951         try:
952             self.merge(contents.items(),
953                        lambda i: i[0],
954                        lambda a, i: a.uuid() == i[1]['uuid'],
955                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
956         except Exception:
957             _logger.exception()
958
959     def want_event_subscribe(self):
960         return True