10117: Consider both use_count and ref_count and check subdirectories to determine...
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8 import threading
9 from apiclient import errors as apiclient_errors
10 import errno
11 import time
12
13 from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
14 from fresh import FreshBase, convertTime, use_counter, check_update
15
16 import arvados.collection
17 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
18
19 _logger = logging.getLogger('arvados.arvados_fuse')
20
21
22 # Match any character which FUSE or Linux cannot accommodate as part
23 # of a filename. (If present in a collection filename, they will
24 # appear as underscores in the fuse mount.)
25 _disallowed_filename_characters = re.compile('[\x00/]')
26
27 # '.' and '..' are not reachable if API server is newer than #6277
28 def sanitize_filename(dirty):
29     """Replace disallowed filename characters with harmless "_"."""
30     if dirty is None:
31         return None
32     elif dirty == '':
33         return '_'
34     elif dirty == '.':
35         return '_'
36     elif dirty == '..':
37         return '__'
38     else:
39         return _disallowed_filename_characters.sub('_', dirty)
40
41
42 class Directory(FreshBase):
43     """Generic directory object, backed by a dict.
44
45     Consists of a set of entries with the key representing the filename
46     and the value referencing a File or Directory object.
47     """
48
49     def __init__(self, parent_inode, inodes):
50         """parent_inode is the integer inode number"""
51
52         super(Directory, self).__init__()
53
54         self.inode = None
55         if not isinstance(parent_inode, int):
56             raise Exception("parent_inode should be an int")
57         self.parent_inode = parent_inode
58         self.inodes = inodes
59         self._entries = {}
60         self._mtime = time.time()
61
62     #  Overriden by subclasses to implement logic to update the entries dict
63     #  when the directory is stale
64     @use_counter
65     def update(self):
66         pass
67
68     # Only used when computing the size of the disk footprint of the directory
69     # (stub)
70     def size(self):
71         return 0
72
73     def persisted(self):
74         return False
75
76     def checkupdate(self):
77         if self.stale():
78             try:
79                 self.update()
80             except apiclient.errors.HttpError as e:
81                 _logger.warn(e)
82
83     @use_counter
84     @check_update
85     def __getitem__(self, item):
86         return self._entries[item]
87
88     @use_counter
89     @check_update
90     def items(self):
91         return list(self._entries.items())
92
93     @use_counter
94     @check_update
95     def __contains__(self, k):
96         return k in self._entries
97
98     @use_counter
99     @check_update
100     def __len__(self):
101         return len(self._entries)
102
103     def fresh(self):
104         self.inodes.touch(self)
105         super(Directory, self).fresh()
106
107     def merge(self, items, fn, same, new_entry):
108         """Helper method for updating the contents of the directory.
109
110         Takes a list describing the new contents of the directory, reuse
111         entries that are the same in both the old and new lists, create new
112         entries, and delete old entries missing from the new list.
113
114         :items: iterable with new directory contents
115
116         :fn: function to take an entry in 'items' and return the desired file or
117         directory name, or None if this entry should be skipped
118
119         :same: function to compare an existing entry (a File or Directory
120         object) with an entry in the items list to determine whether to keep
121         the existing entry.
122
123         :new_entry: function to create a new directory entry (File or Directory
124         object) from an entry in the items list.
125
126         """
127
128         oldentries = self._entries
129         self._entries = {}
130         changed = False
131         for i in items:
132             name = sanitize_filename(fn(i))
133             if name:
134                 if name in oldentries and same(oldentries[name], i):
135                     # move existing directory entry over
136                     self._entries[name] = oldentries[name]
137                     del oldentries[name]
138                 else:
139                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
140                     # create new directory entry
141                     ent = new_entry(i)
142                     if ent is not None:
143                         self._entries[name] = self.inodes.add_entry(ent)
144                         changed = True
145
146         # delete any other directory entries that were not in found in 'items'
147         for i in oldentries:
148             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
149             self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
150             self.inodes.del_entry(oldentries[i])
151             changed = True
152
153         if changed:
154             self.inodes.invalidate_inode(self.inode)
155             self._mtime = time.time()
156
157         self.fresh()
158
159     def can_clear(self):
160         if not super(Directory, self).can_clear():
161             return False
162         for v in self._entries.itervalues():
163             if not v.can_clear():
164                 return False
165         return True
166
167     def clear(self):
168         """Delete all entries"""
169         oldentries = self._entries
170         self._entries = {}
171         for n in oldentries:
172             oldentries[n].clear()
173         for n in oldentries:
174             self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
175             self.inodes.del_entry(oldentries[n])
176         self.inodes.invalidate_inode(self.inode)
177         self.invalidate()
178
179     def mtime(self):
180         return self._mtime
181
182     def writable(self):
183         return False
184
185     def flush(self):
186         pass
187
188     def want_event_subscribe(self):
189         raise NotImplementedError()
190
191     def create(self, name):
192         raise NotImplementedError()
193
194     def mkdir(self, name):
195         raise NotImplementedError()
196
197     def unlink(self, name):
198         raise NotImplementedError()
199
200     def rmdir(self, name):
201         raise NotImplementedError()
202
203     def rename(self, name_old, name_new, src):
204         raise NotImplementedError()
205
206
207 class CollectionDirectoryBase(Directory):
208     """Represent an Arvados Collection as a directory.
209
210     This class is used for Subcollections, and is also the base class for
211     CollectionDirectory, which implements collection loading/saving on
212     Collection records.
213
214     Most operations act only the underlying Arvados `Collection` object.  The
215     `Collection` object signals via a notify callback to
216     `CollectionDirectoryBase.on_event` that an item was added, removed or
217     modified.  FUSE inodes and directory entries are created, deleted or
218     invalidated in response to these events.
219
220     """
221
222     def __init__(self, parent_inode, inodes, collection):
223         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
224         self.collection = collection
225
226     def new_entry(self, name, item, mtime):
227         name = sanitize_filename(name)
228         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
229             if item.fuse_entry.dead is not True:
230                 raise Exception("Can only reparent dead inode entry")
231             if item.fuse_entry.inode is None:
232                 raise Exception("Reparented entry must still have valid inode")
233             item.fuse_entry.dead = False
234             self._entries[name] = item.fuse_entry
235         elif isinstance(item, arvados.collection.RichCollectionBase):
236             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
237             self._entries[name].populate(mtime)
238         else:
239             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
240         item.fuse_entry = self._entries[name]
241
242     def on_event(self, event, collection, name, item):
243         if collection == self.collection:
244             name = sanitize_filename(name)
245             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
246             with llfuse.lock:
247                 if event == arvados.collection.ADD:
248                     self.new_entry(name, item, self.mtime())
249                 elif event == arvados.collection.DEL:
250                     ent = self._entries[name]
251                     del self._entries[name]
252                     self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
253                     self.inodes.del_entry(ent)
254                 elif event == arvados.collection.MOD:
255                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
256                         self.inodes.invalidate_inode(item.fuse_entry.inode)
257                     elif name in self._entries:
258                         self.inodes.invalidate_inode(self._entries[name].inode)
259
260     def populate(self, mtime):
261         self._mtime = mtime
262         self.collection.subscribe(self.on_event)
263         for entry, item in self.collection.items():
264             self.new_entry(entry, item, self.mtime())
265
266     def writable(self):
267         return self.collection.writable()
268
269     @use_counter
270     def flush(self):
271         with llfuse.lock_released:
272             self.collection.root_collection().save()
273
274     @use_counter
275     @check_update
276     def create(self, name):
277         with llfuse.lock_released:
278             self.collection.open(name, "w").close()
279
280     @use_counter
281     @check_update
282     def mkdir(self, name):
283         with llfuse.lock_released:
284             self.collection.mkdirs(name)
285
286     @use_counter
287     @check_update
288     def unlink(self, name):
289         with llfuse.lock_released:
290             self.collection.remove(name)
291         self.flush()
292
293     @use_counter
294     @check_update
295     def rmdir(self, name):
296         with llfuse.lock_released:
297             self.collection.remove(name)
298         self.flush()
299
300     @use_counter
301     @check_update
302     def rename(self, name_old, name_new, src):
303         if not isinstance(src, CollectionDirectoryBase):
304             raise llfuse.FUSEError(errno.EPERM)
305
306         if name_new in self:
307             ent = src[name_old]
308             tgt = self[name_new]
309             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
310                 pass
311             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
312                 if len(tgt) > 0:
313                     raise llfuse.FUSEError(errno.ENOTEMPTY)
314             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
315                 raise llfuse.FUSEError(errno.ENOTDIR)
316             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
317                 raise llfuse.FUSEError(errno.EISDIR)
318
319         with llfuse.lock_released:
320             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
321         self.flush()
322         src.flush()
323
324     def clear(self):
325         r = super(CollectionDirectoryBase, self).clear()
326         self.collection = None
327         return r
328
329
330 class CollectionDirectory(CollectionDirectoryBase):
331     """Represents the root of a directory tree representing a collection."""
332
333     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
334         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
335         self.api = api
336         self.num_retries = num_retries
337         self.collection_record_file = None
338         self.collection_record = None
339         self._poll = True
340         try:
341             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2)/2)
342         except:
343             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
344             self._poll_time = 60*60
345
346         if isinstance(collection_record, dict):
347             self.collection_locator = collection_record['uuid']
348             self._mtime = convertTime(collection_record.get('modified_at'))
349         else:
350             self.collection_locator = collection_record
351             self._mtime = 0
352         self._manifest_size = 0
353         if self.collection_locator:
354             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
355         self._updating_lock = threading.Lock()
356
357     def same(self, i):
358         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
359
360     def writable(self):
361         return self.collection.writable() if self.collection is not None else self._writable
362
363     def want_event_subscribe(self):
364         return (uuid_pattern.match(self.collection_locator) is not None)
365
366     # Used by arv-web.py to switch the contents of the CollectionDirectory
367     def change_collection(self, new_locator):
368         """Switch the contents of the CollectionDirectory.
369
370         Must be called with llfuse.lock held.
371         """
372
373         self.collection_locator = new_locator
374         self.collection_record = None
375         self.update()
376
377     def new_collection(self, new_collection_record, coll_reader):
378         if self.inode:
379             self.clear()
380
381         self.collection_record = new_collection_record
382
383         if self.collection_record:
384             self._mtime = convertTime(self.collection_record.get('modified_at'))
385             self.collection_locator = self.collection_record["uuid"]
386             if self.collection_record_file is not None:
387                 self.collection_record_file.update(self.collection_record)
388
389         self.collection = coll_reader
390         self.populate(self.mtime())
391
392     def uuid(self):
393         return self.collection_locator
394
395     @use_counter
396     def update(self, to_record_version=None):
397         try:
398             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
399                 return True
400
401             if self.collection_locator is None:
402                 self.fresh()
403                 return True
404
405             try:
406                 with llfuse.lock_released:
407                     self._updating_lock.acquire()
408                     if not self.stale():
409                         return
410
411                     _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
412                     if self.collection is not None:
413                         if self.collection.known_past_version(to_record_version):
414                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
415                         else:
416                             self.collection.update()
417                     else:
418                         if uuid_pattern.match(self.collection_locator):
419                             coll_reader = arvados.collection.Collection(
420                                 self.collection_locator, self.api, self.api.keep,
421                                 num_retries=self.num_retries)
422                         else:
423                             coll_reader = arvados.collection.CollectionReader(
424                                 self.collection_locator, self.api, self.api.keep,
425                                 num_retries=self.num_retries)
426                         new_collection_record = coll_reader.api_response() or {}
427                         # If the Collection only exists in Keep, there will be no API
428                         # response.  Fill in the fields we need.
429                         if 'uuid' not in new_collection_record:
430                             new_collection_record['uuid'] = self.collection_locator
431                         if "portable_data_hash" not in new_collection_record:
432                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
433                         if 'manifest_text' not in new_collection_record:
434                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
435
436                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
437                             self.new_collection(new_collection_record, coll_reader)
438
439                         self._manifest_size = len(coll_reader.manifest_text())
440                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
441                 # end with llfuse.lock_released, re-acquire lock
442
443                 self.fresh()
444                 return True
445             finally:
446                 self._updating_lock.release()
447         except arvados.errors.NotFoundError as e:
448             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
449         except arvados.errors.ArgumentError as detail:
450             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
451             if self.collection_record is not None and "manifest_text" in self.collection_record:
452                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
453         except Exception:
454             _logger.exception("arv-mount %s: error", self.collection_locator)
455             if self.collection_record is not None and "manifest_text" in self.collection_record:
456                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
457         self.invalidate()
458         return False
459
460     @use_counter
461     @check_update
462     def __getitem__(self, item):
463         if item == '.arvados#collection':
464             if self.collection_record_file is None:
465                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
466                 self.inodes.add_entry(self.collection_record_file)
467             return self.collection_record_file
468         else:
469             return super(CollectionDirectory, self).__getitem__(item)
470
471     def __contains__(self, k):
472         if k == '.arvados#collection':
473             return True
474         else:
475             return super(CollectionDirectory, self).__contains__(k)
476
477     def invalidate(self):
478         self.collection_record = None
479         self.collection_record_file = None
480         super(CollectionDirectory, self).invalidate()
481
482     def persisted(self):
483         return (self.collection_locator is not None)
484
485     def objsize(self):
486         # This is an empirically-derived heuristic to estimate the memory used
487         # to store this collection's metadata.  Calculating the memory
488         # footprint directly would be more accurate, but also more complicated.
489         return self._manifest_size * 128
490
491     def finalize(self):
492         if self.collection is not None:
493             if self.writable():
494                 self.collection.save()
495             self.collection.stop_threads()
496
497
498 class TmpCollectionDirectory(CollectionDirectoryBase):
499     """A directory backed by an Arvados collection that never gets saved.
500
501     This supports using Keep as scratch space. A userspace program can
502     read the .arvados#collection file to get a current manifest in
503     order to save a snapshot of the scratch data or use it as a crunch
504     job output.
505     """
506
507     class UnsaveableCollection(arvados.collection.Collection):
508         def save(self):
509             pass
510         def save_new(self):
511             pass
512
513     def __init__(self, parent_inode, inodes, api_client, num_retries):
514         collection = self.UnsaveableCollection(
515             api_client=api_client,
516             keep_client=api_client.keep,
517             num_retries=num_retries)
518         super(TmpCollectionDirectory, self).__init__(
519             parent_inode, inodes, collection)
520         self.collection_record_file = None
521         self.populate(self.mtime())
522
523     def on_event(self, *args, **kwargs):
524         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
525         if self.collection_record_file:
526             with llfuse.lock:
527                 self.collection_record_file.invalidate()
528             self.inodes.invalidate_inode(self.collection_record_file.inode)
529             _logger.debug("%s invalidated collection record", self)
530
531     def collection_record(self):
532         with llfuse.lock_released:
533             return {
534                 "uuid": None,
535                 "manifest_text": self.collection.manifest_text(),
536                 "portable_data_hash": self.collection.portable_data_hash(),
537             }
538
539     def __contains__(self, k):
540         return (k == '.arvados#collection' or
541                 super(TmpCollectionDirectory, self).__contains__(k))
542
543     @use_counter
544     def __getitem__(self, item):
545         if item == '.arvados#collection':
546             if self.collection_record_file is None:
547                 self.collection_record_file = FuncToJSONFile(
548                     self.inode, self.collection_record)
549                 self.inodes.add_entry(self.collection_record_file)
550             return self.collection_record_file
551         return super(TmpCollectionDirectory, self).__getitem__(item)
552
553     def persisted(self):
554         return False
555
556     def writable(self):
557         return True
558
559     def want_event_subscribe(self):
560         return False
561
562     def finalize(self):
563         self.collection.stop_threads()
564
565     def invalidate(self):
566         if self.collection_record_file:
567             self.collection_record_file.invalidate()
568         super(TmpCollectionDirectory, self).invalidate()
569
570
571 class MagicDirectory(Directory):
572     """A special directory that logically contains the set of all extant keep locators.
573
574     When a file is referenced by lookup(), it is tested to see if it is a valid
575     keep locator to a manifest, and if so, loads the manifest contents as a
576     subdirectory of this directory with the locator as the directory name.
577     Since querying a list of all extant keep locators is impractical, only
578     collections that have already been accessed are visible to readdir().
579
580     """
581
582     README_TEXT = """
583 This directory provides access to Arvados collections as subdirectories listed
584 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
585 the form '1234567890abcdef0123456789abcdef+123').
586
587 Note that this directory will appear empty until you attempt to access a
588 specific collection subdirectory (such as trying to 'cd' into it), at which
589 point the collection will actually be looked up on the server and the directory
590 will appear if it exists.
591
592 """.lstrip()
593
594     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
595         super(MagicDirectory, self).__init__(parent_inode, inodes)
596         self.api = api
597         self.num_retries = num_retries
598         self.pdh_only = pdh_only
599
600     def __setattr__(self, name, value):
601         super(MagicDirectory, self).__setattr__(name, value)
602         # When we're assigned an inode, add a README.
603         if ((name == 'inode') and (self.inode is not None) and
604               (not self._entries)):
605             self._entries['README'] = self.inodes.add_entry(
606                 StringFile(self.inode, self.README_TEXT, time.time()))
607             # If we're the root directory, add an identical by_id subdirectory.
608             if self.inode == llfuse.ROOT_INODE:
609                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
610                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
611
612     def __contains__(self, k):
613         if k in self._entries:
614             return True
615
616         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
617             return False
618
619         try:
620             e = self.inodes.add_entry(CollectionDirectory(
621                     self.inode, self.inodes, self.api, self.num_retries, k))
622
623             if e.update():
624                 if k not in self._entries:
625                     self._entries[k] = e
626                 else:
627                     self.inodes.del_entry(e)
628                 return True
629             else:
630                 self.inodes.invalidate_entry(self.inode, k)
631                 self.inodes.del_entry(e)
632                 return False
633         except Exception as ex:
634             _logger.debug('arv-mount exception keep %s', ex)
635             self.inodes.del_entry(e)
636             return False
637
638     def __getitem__(self, item):
639         if item in self:
640             return self._entries[item]
641         else:
642             raise KeyError("No collection with id " + item)
643
644     def clear(self):
645         pass
646
647     def want_event_subscribe(self):
648         return not self.pdh_only
649
650
651 class RecursiveInvalidateDirectory(Directory):
652     def invalidate(self):
653         try:
654             super(RecursiveInvalidateDirectory, self).invalidate()
655             for a in self._entries:
656                 self._entries[a].invalidate()
657         except Exception:
658             _logger.exception()
659
660
661 class TagsDirectory(RecursiveInvalidateDirectory):
662     """A special directory that contains as subdirectories all tags visible to the user."""
663
664     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
665         super(TagsDirectory, self).__init__(parent_inode, inodes)
666         self.api = api
667         self.num_retries = num_retries
668         self._poll = True
669         self._poll_time = poll_time
670
671     def want_event_subscribe(self):
672         return True
673
674     @use_counter
675     def update(self):
676         with llfuse.lock_released:
677             tags = self.api.links().list(
678                 filters=[['link_class', '=', 'tag']],
679                 select=['name'], distinct=True
680                 ).execute(num_retries=self.num_retries)
681         if "items" in tags:
682             self.merge(tags['items'],
683                        lambda i: i['name'],
684                        lambda a, i: a.tag == i['name'],
685                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
686
687
688 class TagDirectory(Directory):
689     """A special directory that contains as subdirectories all collections visible
690     to the user that are tagged with a particular tag.
691     """
692
693     def __init__(self, parent_inode, inodes, api, num_retries, tag,
694                  poll=False, poll_time=60):
695         super(TagDirectory, self).__init__(parent_inode, inodes)
696         self.api = api
697         self.num_retries = num_retries
698         self.tag = tag
699         self._poll = poll
700         self._poll_time = poll_time
701
702     def want_event_subscribe(self):
703         return True
704
705     @use_counter
706     def update(self):
707         with llfuse.lock_released:
708             taggedcollections = self.api.links().list(
709                 filters=[['link_class', '=', 'tag'],
710                          ['name', '=', self.tag],
711                          ['head_uuid', 'is_a', 'arvados#collection']],
712                 select=['head_uuid']
713                 ).execute(num_retries=self.num_retries)
714         self.merge(taggedcollections['items'],
715                    lambda i: i['head_uuid'],
716                    lambda a, i: a.collection_locator == i['head_uuid'],
717                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
718
719
720 class ProjectDirectory(Directory):
721     """A special directory that contains the contents of a project."""
722
723     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
724                  poll=False, poll_time=60):
725         super(ProjectDirectory, self).__init__(parent_inode, inodes)
726         self.api = api
727         self.num_retries = num_retries
728         self.project_object = project_object
729         self.project_object_file = None
730         self.project_uuid = project_object['uuid']
731         self._poll = poll
732         self._poll_time = poll_time
733         self._updating_lock = threading.Lock()
734         self._current_user = None
735
736     def want_event_subscribe(self):
737         return True
738
739     def createDirectory(self, i):
740         if collection_uuid_pattern.match(i['uuid']):
741             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
742         elif group_uuid_pattern.match(i['uuid']):
743             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
744         elif link_uuid_pattern.match(i['uuid']):
745             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
746                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
747             else:
748                 return None
749         elif uuid_pattern.match(i['uuid']):
750             return ObjectFile(self.parent_inode, i)
751         else:
752             return None
753
754     def uuid(self):
755         return self.project_uuid
756
757     @use_counter
758     def update(self):
759         if self.project_object_file == None:
760             self.project_object_file = ObjectFile(self.inode, self.project_object)
761             self.inodes.add_entry(self.project_object_file)
762
763         def namefn(i):
764             if 'name' in i:
765                 if i['name'] is None or len(i['name']) == 0:
766                     return None
767                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
768                     # collection or subproject
769                     return i['name']
770                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
771                     # name link
772                     return i['name']
773                 elif 'kind' in i and i['kind'].startswith('arvados#'):
774                     # something else
775                     return "{}.{}".format(i['name'], i['kind'][8:])
776             else:
777                 return None
778
779         def samefn(a, i):
780             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
781                 return a.uuid() == i['uuid']
782             elif isinstance(a, ObjectFile):
783                 return a.uuid() == i['uuid'] and not a.stale()
784             return False
785
786         try:
787             with llfuse.lock_released:
788                 self._updating_lock.acquire()
789                 if not self.stale():
790                     return
791
792                 if group_uuid_pattern.match(self.project_uuid):
793                     self.project_object = self.api.groups().get(
794                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
795                 elif user_uuid_pattern.match(self.project_uuid):
796                     self.project_object = self.api.users().get(
797                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
798
799                 contents = arvados.util.list_all(self.api.groups().contents,
800                                                  self.num_retries, uuid=self.project_uuid)
801
802             # end with llfuse.lock_released, re-acquire lock
803
804             self.merge(contents,
805                        namefn,
806                        samefn,
807                        self.createDirectory)
808         finally:
809             self._updating_lock.release()
810
811     @use_counter
812     @check_update
813     def __getitem__(self, item):
814         if item == '.arvados#project':
815             return self.project_object_file
816         else:
817             return super(ProjectDirectory, self).__getitem__(item)
818
819     def __contains__(self, k):
820         if k == '.arvados#project':
821             return True
822         else:
823             return super(ProjectDirectory, self).__contains__(k)
824
825     @use_counter
826     @check_update
827     def writable(self):
828         with llfuse.lock_released:
829             if not self._current_user:
830                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
831             return self._current_user["uuid"] in self.project_object["writable_by"]
832
833     def persisted(self):
834         return True
835
836     @use_counter
837     @check_update
838     def mkdir(self, name):
839         try:
840             with llfuse.lock_released:
841                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
842                                                     "name": name,
843                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
844             self.invalidate()
845         except apiclient_errors.Error as error:
846             _logger.error(error)
847             raise llfuse.FUSEError(errno.EEXIST)
848
849     @use_counter
850     @check_update
851     def rmdir(self, name):
852         if name not in self:
853             raise llfuse.FUSEError(errno.ENOENT)
854         if not isinstance(self[name], CollectionDirectory):
855             raise llfuse.FUSEError(errno.EPERM)
856         if len(self[name]) > 0:
857             raise llfuse.FUSEError(errno.ENOTEMPTY)
858         with llfuse.lock_released:
859             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
860         self.invalidate()
861
862     @use_counter
863     @check_update
864     def rename(self, name_old, name_new, src):
865         if not isinstance(src, ProjectDirectory):
866             raise llfuse.FUSEError(errno.EPERM)
867
868         ent = src[name_old]
869
870         if not isinstance(ent, CollectionDirectory):
871             raise llfuse.FUSEError(errno.EPERM)
872
873         if name_new in self:
874             # POSIX semantics for replacing one directory with another is
875             # tricky (the target directory must be empty, the operation must be
876             # atomic which isn't possible with the Arvados API as of this
877             # writing) so don't support that.
878             raise llfuse.FUSEError(errno.EPERM)
879
880         self.api.collections().update(uuid=ent.uuid(),
881                                       body={"owner_uuid": self.uuid(),
882                                             "name": name_new}).execute(num_retries=self.num_retries)
883
884         # Acually move the entry from source directory to this directory.
885         del src._entries[name_old]
886         self._entries[name_new] = ent
887         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
888
889
890 class SharedDirectory(Directory):
891     """A special directory that represents users or groups who have shared projects with me."""
892
893     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
894                  poll=False, poll_time=60):
895         super(SharedDirectory, self).__init__(parent_inode, inodes)
896         self.api = api
897         self.num_retries = num_retries
898         self.current_user = api.users().current().execute(num_retries=num_retries)
899         self._poll = True
900         self._poll_time = poll_time
901
902     @use_counter
903     def update(self):
904         with llfuse.lock_released:
905             all_projects = arvados.util.list_all(
906                 self.api.groups().list, self.num_retries,
907                 filters=[['group_class','=','project']])
908             objects = {}
909             for ob in all_projects:
910                 objects[ob['uuid']] = ob
911
912             roots = []
913             root_owners = {}
914             for ob in all_projects:
915                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
916                     roots.append(ob)
917                     root_owners[ob['owner_uuid']] = True
918
919             lusers = arvados.util.list_all(
920                 self.api.users().list, self.num_retries,
921                 filters=[['uuid','in', list(root_owners)]])
922             lgroups = arvados.util.list_all(
923                 self.api.groups().list, self.num_retries,
924                 filters=[['uuid','in', list(root_owners)]])
925
926             users = {}
927             groups = {}
928
929             for l in lusers:
930                 objects[l["uuid"]] = l
931             for l in lgroups:
932                 objects[l["uuid"]] = l
933
934             contents = {}
935             for r in root_owners:
936                 if r in objects:
937                     obr = objects[r]
938                     if obr.get("name"):
939                         contents[obr["name"]] = obr
940                     #elif obr.get("username"):
941                     #    contents[obr["username"]] = obr
942                     elif "first_name" in obr:
943                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
944
945
946             for r in roots:
947                 if r['owner_uuid'] not in objects:
948                     contents[r['name']] = r
949
950         # end with llfuse.lock_released, re-acquire lock
951
952         try:
953             self.merge(contents.items(),
954                        lambda i: i[0],
955                        lambda a, i: a.uuid() == i[1]['uuid'],
956                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
957         except Exception:
958             _logger.exception()
959
960     def want_event_subscribe(self):
961         return True