10117: Additional refinement: a directory not in use but still referenced needs
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8 import threading
9 from apiclient import errors as apiclient_errors
10 import errno
11 import time
12
13 from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
14 from fresh import FreshBase, convertTime, use_counter, check_update
15
16 import arvados.collection
17 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
18
19 _logger = logging.getLogger('arvados.arvados_fuse')
20
21
22 # Match any character which FUSE or Linux cannot accommodate as part
23 # of a filename. (If present in a collection filename, they will
24 # appear as underscores in the fuse mount.)
25 _disallowed_filename_characters = re.compile('[\x00/]')
26
27 # '.' and '..' are not reachable if API server is newer than #6277
28 def sanitize_filename(dirty):
29     """Replace disallowed filename characters with harmless "_"."""
30     if dirty is None:
31         return None
32     elif dirty == '':
33         return '_'
34     elif dirty == '.':
35         return '_'
36     elif dirty == '..':
37         return '__'
38     else:
39         return _disallowed_filename_characters.sub('_', dirty)
40
41
42 class Directory(FreshBase):
43     """Generic directory object, backed by a dict.
44
45     Consists of a set of entries with the key representing the filename
46     and the value referencing a File or Directory object.
47     """
48
49     def __init__(self, parent_inode, inodes):
50         """parent_inode is the integer inode number"""
51
52         super(Directory, self).__init__()
53
54         self.inode = None
55         if not isinstance(parent_inode, int):
56             raise Exception("parent_inode should be an int")
57         self.parent_inode = parent_inode
58         self.inodes = inodes
59         self._entries = {}
60         self._mtime = time.time()
61
62     #  Overriden by subclasses to implement logic to update the entries dict
63     #  when the directory is stale
64     @use_counter
65     def update(self):
66         pass
67
68     # Only used when computing the size of the disk footprint of the directory
69     # (stub)
70     def size(self):
71         return 0
72
73     def persisted(self):
74         return False
75
76     def checkupdate(self):
77         if self.stale():
78             try:
79                 self.update()
80             except apiclient.errors.HttpError as e:
81                 _logger.warn(e)
82
83     @use_counter
84     @check_update
85     def __getitem__(self, item):
86         return self._entries[item]
87
88     @use_counter
89     @check_update
90     def items(self):
91         return list(self._entries.items())
92
93     @use_counter
94     @check_update
95     def __contains__(self, k):
96         return k in self._entries
97
98     @use_counter
99     @check_update
100     def __len__(self):
101         return len(self._entries)
102
103     def fresh(self):
104         self.inodes.touch(self)
105         super(Directory, self).fresh()
106
107     def merge(self, items, fn, same, new_entry):
108         """Helper method for updating the contents of the directory.
109
110         Takes a list describing the new contents of the directory, reuse
111         entries that are the same in both the old and new lists, create new
112         entries, and delete old entries missing from the new list.
113
114         :items: iterable with new directory contents
115
116         :fn: function to take an entry in 'items' and return the desired file or
117         directory name, or None if this entry should be skipped
118
119         :same: function to compare an existing entry (a File or Directory
120         object) with an entry in the items list to determine whether to keep
121         the existing entry.
122
123         :new_entry: function to create a new directory entry (File or Directory
124         object) from an entry in the items list.
125
126         """
127
128         oldentries = self._entries
129         self._entries = {}
130         changed = False
131         for i in items:
132             name = sanitize_filename(fn(i))
133             if name:
134                 if name in oldentries and same(oldentries[name], i):
135                     # move existing directory entry over
136                     self._entries[name] = oldentries[name]
137                     del oldentries[name]
138                 else:
139                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
140                     # create new directory entry
141                     ent = new_entry(i)
142                     if ent is not None:
143                         self._entries[name] = self.inodes.add_entry(ent)
144                         changed = True
145
146         # delete any other directory entries that were not in found in 'items'
147         for i in oldentries:
148             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
149             self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
150             self.inodes.del_entry(oldentries[i])
151             changed = True
152
153         if changed:
154             self.inodes.invalidate_inode(self.inode)
155             self._mtime = time.time()
156
157         self.fresh()
158
159     def in_use(self):
160         if super(Directory, self).in_use():
161             return True
162         for v in self._entries.itervalues():
163             if v.in_use():
164                 return True
165         return False
166
167     def has_ref(self, only_children=False):
168         if not only_children and super(Directory, self).has_ref():
169             return True
170         for v in self._entries.itervalues():
171             if v.has_ref():
172                 return True
173         return False
174
175     def clear(self):
176         """Delete all entries"""
177         oldentries = self._entries
178         self._entries = {}
179         for n in oldentries:
180             oldentries[n].clear()
181             self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
182             self.inodes.del_entry(oldentries[n])
183         self.inodes.invalidate_inode(self.inode)
184         self.invalidate()
185
186     def invalidate(self):
187         try:
188             super(Directory, self).invalidate()
189             for n, e in self._entries.iteritems():
190                 self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
191                 e.invalidate()
192             self.inodes.invalidate_inode(self.inode)
193         except Exception:
194             _logger.exception()
195
196     def mtime(self):
197         return self._mtime
198
199     def writable(self):
200         return False
201
202     def flush(self):
203         pass
204
205     def want_event_subscribe(self):
206         raise NotImplementedError()
207
208     def create(self, name):
209         raise NotImplementedError()
210
211     def mkdir(self, name):
212         raise NotImplementedError()
213
214     def unlink(self, name):
215         raise NotImplementedError()
216
217     def rmdir(self, name):
218         raise NotImplementedError()
219
220     def rename(self, name_old, name_new, src):
221         raise NotImplementedError()
222
223
224 class CollectionDirectoryBase(Directory):
225     """Represent an Arvados Collection as a directory.
226
227     This class is used for Subcollections, and is also the base class for
228     CollectionDirectory, which implements collection loading/saving on
229     Collection records.
230
231     Most operations act only the underlying Arvados `Collection` object.  The
232     `Collection` object signals via a notify callback to
233     `CollectionDirectoryBase.on_event` that an item was added, removed or
234     modified.  FUSE inodes and directory entries are created, deleted or
235     invalidated in response to these events.
236
237     """
238
239     def __init__(self, parent_inode, inodes, collection):
240         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
241         self.collection = collection
242
243     def new_entry(self, name, item, mtime):
244         name = sanitize_filename(name)
245         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
246             if item.fuse_entry.dead is not True:
247                 raise Exception("Can only reparent dead inode entry")
248             if item.fuse_entry.inode is None:
249                 raise Exception("Reparented entry must still have valid inode")
250             item.fuse_entry.dead = False
251             self._entries[name] = item.fuse_entry
252         elif isinstance(item, arvados.collection.RichCollectionBase):
253             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
254             self._entries[name].populate(mtime)
255         else:
256             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
257         item.fuse_entry = self._entries[name]
258
259     def on_event(self, event, collection, name, item):
260         if collection == self.collection:
261             name = sanitize_filename(name)
262             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
263             with llfuse.lock:
264                 if event == arvados.collection.ADD:
265                     self.new_entry(name, item, self.mtime())
266                 elif event == arvados.collection.DEL:
267                     ent = self._entries[name]
268                     del self._entries[name]
269                     self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
270                     self.inodes.del_entry(ent)
271                 elif event == arvados.collection.MOD:
272                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
273                         self.inodes.invalidate_inode(item.fuse_entry.inode)
274                     elif name in self._entries:
275                         self.inodes.invalidate_inode(self._entries[name].inode)
276
277     def populate(self, mtime):
278         self._mtime = mtime
279         self.collection.subscribe(self.on_event)
280         for entry, item in self.collection.items():
281             self.new_entry(entry, item, self.mtime())
282
283     def writable(self):
284         return self.collection.writable()
285
286     @use_counter
287     def flush(self):
288         with llfuse.lock_released:
289             self.collection.root_collection().save()
290
291     @use_counter
292     @check_update
293     def create(self, name):
294         with llfuse.lock_released:
295             self.collection.open(name, "w").close()
296
297     @use_counter
298     @check_update
299     def mkdir(self, name):
300         with llfuse.lock_released:
301             self.collection.mkdirs(name)
302
303     @use_counter
304     @check_update
305     def unlink(self, name):
306         with llfuse.lock_released:
307             self.collection.remove(name)
308         self.flush()
309
310     @use_counter
311     @check_update
312     def rmdir(self, name):
313         with llfuse.lock_released:
314             self.collection.remove(name)
315         self.flush()
316
317     @use_counter
318     @check_update
319     def rename(self, name_old, name_new, src):
320         if not isinstance(src, CollectionDirectoryBase):
321             raise llfuse.FUSEError(errno.EPERM)
322
323         if name_new in self:
324             ent = src[name_old]
325             tgt = self[name_new]
326             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
327                 pass
328             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
329                 if len(tgt) > 0:
330                     raise llfuse.FUSEError(errno.ENOTEMPTY)
331             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
332                 raise llfuse.FUSEError(errno.ENOTDIR)
333             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
334                 raise llfuse.FUSEError(errno.EISDIR)
335
336         with llfuse.lock_released:
337             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
338         self.flush()
339         src.flush()
340
341     def clear(self):
342         r = super(CollectionDirectoryBase, self).clear()
343         self.collection = None
344         self._manifest_size = 0
345         return r
346
347
348 class CollectionDirectory(CollectionDirectoryBase):
349     """Represents the root of a directory tree representing a collection."""
350
351     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
352         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
353         self.api = api
354         self.num_retries = num_retries
355         self.collection_record_file = None
356         self.collection_record = None
357         self._poll = True
358         try:
359             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2)/2)
360         except:
361             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
362             self._poll_time = 60*60
363
364         if isinstance(collection_record, dict):
365             self.collection_locator = collection_record['uuid']
366             self._mtime = convertTime(collection_record.get('modified_at'))
367         else:
368             self.collection_locator = collection_record
369             self._mtime = 0
370         self._manifest_size = 0
371         if self.collection_locator:
372             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
373         self._updating_lock = threading.Lock()
374
375     def same(self, i):
376         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
377
378     def writable(self):
379         return self.collection.writable() if self.collection is not None else self._writable
380
381     def want_event_subscribe(self):
382         return (uuid_pattern.match(self.collection_locator) is not None)
383
384     # Used by arv-web.py to switch the contents of the CollectionDirectory
385     def change_collection(self, new_locator):
386         """Switch the contents of the CollectionDirectory.
387
388         Must be called with llfuse.lock held.
389         """
390
391         self.collection_locator = new_locator
392         self.collection_record = None
393         self.update()
394
395     def new_collection(self, new_collection_record, coll_reader):
396         if self.inode:
397             self.clear()
398
399         self.collection_record = new_collection_record
400
401         if self.collection_record:
402             self._mtime = convertTime(self.collection_record.get('modified_at'))
403             self.collection_locator = self.collection_record["uuid"]
404             if self.collection_record_file is not None:
405                 self.collection_record_file.update(self.collection_record)
406
407         self.collection = coll_reader
408         self.populate(self.mtime())
409
410     def uuid(self):
411         return self.collection_locator
412
413     @use_counter
414     def update(self, to_record_version=None):
415         try:
416             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
417                 return True
418
419             if self.collection_locator is None:
420                 self.fresh()
421                 return True
422
423             try:
424                 with llfuse.lock_released:
425                     self._updating_lock.acquire()
426                     if not self.stale():
427                         return
428
429                     _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
430                     if self.collection is not None:
431                         if self.collection.known_past_version(to_record_version):
432                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
433                         else:
434                             self.collection.update()
435                     else:
436                         if uuid_pattern.match(self.collection_locator):
437                             coll_reader = arvados.collection.Collection(
438                                 self.collection_locator, self.api, self.api.keep,
439                                 num_retries=self.num_retries)
440                         else:
441                             coll_reader = arvados.collection.CollectionReader(
442                                 self.collection_locator, self.api, self.api.keep,
443                                 num_retries=self.num_retries)
444                         new_collection_record = coll_reader.api_response() or {}
445                         # If the Collection only exists in Keep, there will be no API
446                         # response.  Fill in the fields we need.
447                         if 'uuid' not in new_collection_record:
448                             new_collection_record['uuid'] = self.collection_locator
449                         if "portable_data_hash" not in new_collection_record:
450                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
451                         if 'manifest_text' not in new_collection_record:
452                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
453
454                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
455                             self.new_collection(new_collection_record, coll_reader)
456
457                         self._manifest_size = len(coll_reader.manifest_text())
458                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
459                 # end with llfuse.lock_released, re-acquire lock
460
461                 self.fresh()
462                 return True
463             finally:
464                 self._updating_lock.release()
465         except arvados.errors.NotFoundError as e:
466             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
467         except arvados.errors.ArgumentError as detail:
468             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
469             if self.collection_record is not None and "manifest_text" in self.collection_record:
470                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
471         except Exception:
472             _logger.exception("arv-mount %s: error", self.collection_locator)
473             if self.collection_record is not None and "manifest_text" in self.collection_record:
474                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
475         self.invalidate()
476         return False
477
478     @use_counter
479     @check_update
480     def __getitem__(self, item):
481         if item == '.arvados#collection':
482             if self.collection_record_file is None:
483                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
484                 self.inodes.add_entry(self.collection_record_file)
485             return self.collection_record_file
486         else:
487             return super(CollectionDirectory, self).__getitem__(item)
488
489     def __contains__(self, k):
490         if k == '.arvados#collection':
491             return True
492         else:
493             return super(CollectionDirectory, self).__contains__(k)
494
495     def invalidate(self):
496         self.collection_record = None
497         self.collection_record_file = None
498         super(CollectionDirectory, self).invalidate()
499
500     def persisted(self):
501         return (self.collection_locator is not None)
502
503     def objsize(self):
504         # This is an empirically-derived heuristic to estimate the memory used
505         # to store this collection's metadata.  Calculating the memory
506         # footprint directly would be more accurate, but also more complicated.
507         return self._manifest_size * 128
508
509     def finalize(self):
510         if self.collection is not None:
511             if self.writable():
512                 self.collection.save()
513             self.collection.stop_threads()
514
515
516 class TmpCollectionDirectory(CollectionDirectoryBase):
517     """A directory backed by an Arvados collection that never gets saved.
518
519     This supports using Keep as scratch space. A userspace program can
520     read the .arvados#collection file to get a current manifest in
521     order to save a snapshot of the scratch data or use it as a crunch
522     job output.
523     """
524
525     class UnsaveableCollection(arvados.collection.Collection):
526         def save(self):
527             pass
528         def save_new(self):
529             pass
530
531     def __init__(self, parent_inode, inodes, api_client, num_retries):
532         collection = self.UnsaveableCollection(
533             api_client=api_client,
534             keep_client=api_client.keep,
535             num_retries=num_retries)
536         super(TmpCollectionDirectory, self).__init__(
537             parent_inode, inodes, collection)
538         self.collection_record_file = None
539         self.populate(self.mtime())
540
541     def on_event(self, *args, **kwargs):
542         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
543         if self.collection_record_file:
544             with llfuse.lock:
545                 self.collection_record_file.invalidate()
546             self.inodes.invalidate_inode(self.collection_record_file.inode)
547             _logger.debug("%s invalidated collection record", self)
548
549     def collection_record(self):
550         with llfuse.lock_released:
551             return {
552                 "uuid": None,
553                 "manifest_text": self.collection.manifest_text(),
554                 "portable_data_hash": self.collection.portable_data_hash(),
555             }
556
557     def __contains__(self, k):
558         return (k == '.arvados#collection' or
559                 super(TmpCollectionDirectory, self).__contains__(k))
560
561     @use_counter
562     def __getitem__(self, item):
563         if item == '.arvados#collection':
564             if self.collection_record_file is None:
565                 self.collection_record_file = FuncToJSONFile(
566                     self.inode, self.collection_record)
567                 self.inodes.add_entry(self.collection_record_file)
568             return self.collection_record_file
569         return super(TmpCollectionDirectory, self).__getitem__(item)
570
571     def persisted(self):
572         return False
573
574     def writable(self):
575         return True
576
577     def want_event_subscribe(self):
578         return False
579
580     def finalize(self):
581         self.collection.stop_threads()
582
583     def invalidate(self):
584         if self.collection_record_file:
585             self.collection_record_file.invalidate()
586         super(TmpCollectionDirectory, self).invalidate()
587
588
589 class MagicDirectory(Directory):
590     """A special directory that logically contains the set of all extant keep locators.
591
592     When a file is referenced by lookup(), it is tested to see if it is a valid
593     keep locator to a manifest, and if so, loads the manifest contents as a
594     subdirectory of this directory with the locator as the directory name.
595     Since querying a list of all extant keep locators is impractical, only
596     collections that have already been accessed are visible to readdir().
597
598     """
599
600     README_TEXT = """
601 This directory provides access to Arvados collections as subdirectories listed
602 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
603 the form '1234567890abcdef0123456789abcdef+123').
604
605 Note that this directory will appear empty until you attempt to access a
606 specific collection subdirectory (such as trying to 'cd' into it), at which
607 point the collection will actually be looked up on the server and the directory
608 will appear if it exists.
609
610 """.lstrip()
611
612     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
613         super(MagicDirectory, self).__init__(parent_inode, inodes)
614         self.api = api
615         self.num_retries = num_retries
616         self.pdh_only = pdh_only
617
618     def __setattr__(self, name, value):
619         super(MagicDirectory, self).__setattr__(name, value)
620         # When we're assigned an inode, add a README.
621         if ((name == 'inode') and (self.inode is not None) and
622               (not self._entries)):
623             self._entries['README'] = self.inodes.add_entry(
624                 StringFile(self.inode, self.README_TEXT, time.time()))
625             # If we're the root directory, add an identical by_id subdirectory.
626             if self.inode == llfuse.ROOT_INODE:
627                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
628                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
629
630     def __contains__(self, k):
631         if k in self._entries:
632             return True
633
634         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
635             return False
636
637         try:
638             e = self.inodes.add_entry(CollectionDirectory(
639                     self.inode, self.inodes, self.api, self.num_retries, k))
640
641             if e.update():
642                 if k not in self._entries:
643                     self._entries[k] = e
644                 else:
645                     self.inodes.del_entry(e)
646                 return True
647             else:
648                 self.inodes.invalidate_entry(self.inode, k)
649                 self.inodes.del_entry(e)
650                 return False
651         except Exception as ex:
652             _logger.debug('arv-mount exception keep %s', ex)
653             self.inodes.del_entry(e)
654             return False
655
656     def __getitem__(self, item):
657         if item in self:
658             return self._entries[item]
659         else:
660             raise KeyError("No collection with id " + item)
661
662     def clear(self):
663         pass
664
665     def want_event_subscribe(self):
666         return not self.pdh_only
667
668
669 class TagsDirectory(Directory):
670     """A special directory that contains as subdirectories all tags visible to the user."""
671
672     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
673         super(TagsDirectory, self).__init__(parent_inode, inodes)
674         self.api = api
675         self.num_retries = num_retries
676         self._poll = True
677         self._poll_time = poll_time
678
679     def want_event_subscribe(self):
680         return True
681
682     @use_counter
683     def update(self):
684         with llfuse.lock_released:
685             tags = self.api.links().list(
686                 filters=[['link_class', '=', 'tag']],
687                 select=['name'], distinct=True
688                 ).execute(num_retries=self.num_retries)
689         if "items" in tags:
690             self.merge(tags['items'],
691                        lambda i: i['name'],
692                        lambda a, i: a.tag == i['name'],
693                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
694
695
696 class TagDirectory(Directory):
697     """A special directory that contains as subdirectories all collections visible
698     to the user that are tagged with a particular tag.
699     """
700
701     def __init__(self, parent_inode, inodes, api, num_retries, tag,
702                  poll=False, poll_time=60):
703         super(TagDirectory, self).__init__(parent_inode, inodes)
704         self.api = api
705         self.num_retries = num_retries
706         self.tag = tag
707         self._poll = poll
708         self._poll_time = poll_time
709
710     def want_event_subscribe(self):
711         return True
712
713     @use_counter
714     def update(self):
715         with llfuse.lock_released:
716             taggedcollections = self.api.links().list(
717                 filters=[['link_class', '=', 'tag'],
718                          ['name', '=', self.tag],
719                          ['head_uuid', 'is_a', 'arvados#collection']],
720                 select=['head_uuid']
721                 ).execute(num_retries=self.num_retries)
722         self.merge(taggedcollections['items'],
723                    lambda i: i['head_uuid'],
724                    lambda a, i: a.collection_locator == i['head_uuid'],
725                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
726
727
728 class ProjectDirectory(Directory):
729     """A special directory that contains the contents of a project."""
730
731     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
732                  poll=False, poll_time=60):
733         super(ProjectDirectory, self).__init__(parent_inode, inodes)
734         self.api = api
735         self.num_retries = num_retries
736         self.project_object = project_object
737         self.project_object_file = None
738         self.project_uuid = project_object['uuid']
739         self._poll = poll
740         self._poll_time = poll_time
741         self._updating_lock = threading.Lock()
742         self._current_user = None
743
744     def want_event_subscribe(self):
745         return True
746
747     def createDirectory(self, i):
748         if collection_uuid_pattern.match(i['uuid']):
749             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
750         elif group_uuid_pattern.match(i['uuid']):
751             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
752         elif link_uuid_pattern.match(i['uuid']):
753             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
754                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
755             else:
756                 return None
757         elif uuid_pattern.match(i['uuid']):
758             return ObjectFile(self.parent_inode, i)
759         else:
760             return None
761
762     def uuid(self):
763         return self.project_uuid
764
765     @use_counter
766     def update(self):
767         if self.project_object_file == None:
768             self.project_object_file = ObjectFile(self.inode, self.project_object)
769             self.inodes.add_entry(self.project_object_file)
770
771         def namefn(i):
772             if 'name' in i:
773                 if i['name'] is None or len(i['name']) == 0:
774                     return None
775                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
776                     # collection or subproject
777                     return i['name']
778                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
779                     # name link
780                     return i['name']
781                 elif 'kind' in i and i['kind'].startswith('arvados#'):
782                     # something else
783                     return "{}.{}".format(i['name'], i['kind'][8:])
784             else:
785                 return None
786
787         def samefn(a, i):
788             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
789                 return a.uuid() == i['uuid']
790             elif isinstance(a, ObjectFile):
791                 return a.uuid() == i['uuid'] and not a.stale()
792             return False
793
794         try:
795             with llfuse.lock_released:
796                 self._updating_lock.acquire()
797                 if not self.stale():
798                     return
799
800                 if group_uuid_pattern.match(self.project_uuid):
801                     self.project_object = self.api.groups().get(
802                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
803                 elif user_uuid_pattern.match(self.project_uuid):
804                     self.project_object = self.api.users().get(
805                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
806
807                 contents = arvados.util.list_all(self.api.groups().contents,
808                                                  self.num_retries, uuid=self.project_uuid)
809
810             # end with llfuse.lock_released, re-acquire lock
811
812             self.merge(contents,
813                        namefn,
814                        samefn,
815                        self.createDirectory)
816         finally:
817             self._updating_lock.release()
818
819     @use_counter
820     @check_update
821     def __getitem__(self, item):
822         if item == '.arvados#project':
823             return self.project_object_file
824         else:
825             return super(ProjectDirectory, self).__getitem__(item)
826
827     def __contains__(self, k):
828         if k == '.arvados#project':
829             return True
830         else:
831             return super(ProjectDirectory, self).__contains__(k)
832
833     @use_counter
834     @check_update
835     def writable(self):
836         with llfuse.lock_released:
837             if not self._current_user:
838                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
839             return self._current_user["uuid"] in self.project_object["writable_by"]
840
841     def persisted(self):
842         return True
843
844     @use_counter
845     @check_update
846     def mkdir(self, name):
847         try:
848             with llfuse.lock_released:
849                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
850                                                     "name": name,
851                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
852             self.invalidate()
853         except apiclient_errors.Error as error:
854             _logger.error(error)
855             raise llfuse.FUSEError(errno.EEXIST)
856
857     @use_counter
858     @check_update
859     def rmdir(self, name):
860         if name not in self:
861             raise llfuse.FUSEError(errno.ENOENT)
862         if not isinstance(self[name], CollectionDirectory):
863             raise llfuse.FUSEError(errno.EPERM)
864         if len(self[name]) > 0:
865             raise llfuse.FUSEError(errno.ENOTEMPTY)
866         with llfuse.lock_released:
867             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
868         self.invalidate()
869
870     @use_counter
871     @check_update
872     def rename(self, name_old, name_new, src):
873         if not isinstance(src, ProjectDirectory):
874             raise llfuse.FUSEError(errno.EPERM)
875
876         ent = src[name_old]
877
878         if not isinstance(ent, CollectionDirectory):
879             raise llfuse.FUSEError(errno.EPERM)
880
881         if name_new in self:
882             # POSIX semantics for replacing one directory with another is
883             # tricky (the target directory must be empty, the operation must be
884             # atomic which isn't possible with the Arvados API as of this
885             # writing) so don't support that.
886             raise llfuse.FUSEError(errno.EPERM)
887
888         self.api.collections().update(uuid=ent.uuid(),
889                                       body={"owner_uuid": self.uuid(),
890                                             "name": name_new}).execute(num_retries=self.num_retries)
891
892         # Acually move the entry from source directory to this directory.
893         del src._entries[name_old]
894         self._entries[name_new] = ent
895         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
896
897
898 class SharedDirectory(Directory):
899     """A special directory that represents users or groups who have shared projects with me."""
900
901     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
902                  poll=False, poll_time=60):
903         super(SharedDirectory, self).__init__(parent_inode, inodes)
904         self.api = api
905         self.num_retries = num_retries
906         self.current_user = api.users().current().execute(num_retries=num_retries)
907         self._poll = True
908         self._poll_time = poll_time
909
910     @use_counter
911     def update(self):
912         with llfuse.lock_released:
913             all_projects = arvados.util.list_all(
914                 self.api.groups().list, self.num_retries,
915                 filters=[['group_class','=','project']])
916             objects = {}
917             for ob in all_projects:
918                 objects[ob['uuid']] = ob
919
920             roots = []
921             root_owners = {}
922             for ob in all_projects:
923                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
924                     roots.append(ob)
925                     root_owners[ob['owner_uuid']] = True
926
927             lusers = arvados.util.list_all(
928                 self.api.users().list, self.num_retries,
929                 filters=[['uuid','in', list(root_owners)]])
930             lgroups = arvados.util.list_all(
931                 self.api.groups().list, self.num_retries,
932                 filters=[['uuid','in', list(root_owners)]])
933
934             users = {}
935             groups = {}
936
937             for l in lusers:
938                 objects[l["uuid"]] = l
939             for l in lgroups:
940                 objects[l["uuid"]] = l
941
942             contents = {}
943             for r in root_owners:
944                 if r in objects:
945                     obr = objects[r]
946                     if obr.get("name"):
947                         contents[obr["name"]] = obr
948                     #elif obr.get("username"):
949                     #    contents[obr["username"]] = obr
950                     elif "first_name" in obr:
951                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
952
953
954             for r in roots:
955                 if r['owner_uuid'] not in objects:
956                     contents[r['name']] = r
957
958         # end with llfuse.lock_released, re-acquire lock
959
960         try:
961             self.merge(contents.items(),
962                        lambda i: i[0],
963                        lambda a, i: a.uuid() == i[1]['uuid'],
964                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
965         except Exception:
966             _logger.exception()
967
968     def want_event_subscribe(self):
969         return True