8712: Set self.collection = None when clearing the contents of a
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8 import threading
9 from apiclient import errors as apiclient_errors
10 import errno
11 import time
12
13 from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
14 from fresh import FreshBase, convertTime, use_counter, check_update
15
16 import arvados.collection
17 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
18
19 _logger = logging.getLogger('arvados.arvados_fuse')
20
21
22 # Match any character which FUSE or Linux cannot accommodate as part
23 # of a filename. (If present in a collection filename, they will
24 # appear as underscores in the fuse mount.)
25 _disallowed_filename_characters = re.compile('[\x00/]')
26
27 # '.' and '..' are not reachable if API server is newer than #6277
28 def sanitize_filename(dirty):
29     """Replace disallowed filename characters with harmless "_"."""
30     if dirty is None:
31         return None
32     elif dirty == '':
33         return '_'
34     elif dirty == '.':
35         return '_'
36     elif dirty == '..':
37         return '__'
38     else:
39         return _disallowed_filename_characters.sub('_', dirty)
40
41
42 class Directory(FreshBase):
43     """Generic directory object, backed by a dict.
44
45     Consists of a set of entries with the key representing the filename
46     and the value referencing a File or Directory object.
47     """
48
49     def __init__(self, parent_inode, inodes):
50         """parent_inode is the integer inode number"""
51
52         super(Directory, self).__init__()
53
54         self.inode = None
55         if not isinstance(parent_inode, int):
56             raise Exception("parent_inode should be an int")
57         self.parent_inode = parent_inode
58         self.inodes = inodes
59         self._entries = {}
60         self._mtime = time.time()
61
62     #  Overriden by subclasses to implement logic to update the entries dict
63     #  when the directory is stale
64     @use_counter
65     def update(self):
66         pass
67
68     # Only used when computing the size of the disk footprint of the directory
69     # (stub)
70     def size(self):
71         return 0
72
73     def persisted(self):
74         return False
75
76     def checkupdate(self):
77         if self.stale():
78             try:
79                 self.update()
80             except apiclient.errors.HttpError as e:
81                 _logger.warn(e)
82
83     @use_counter
84     @check_update
85     def __getitem__(self, item):
86         return self._entries[item]
87
88     @use_counter
89     @check_update
90     def items(self):
91         return list(self._entries.items())
92
93     @use_counter
94     @check_update
95     def __contains__(self, k):
96         return k in self._entries
97
98     @use_counter
99     @check_update
100     def __len__(self):
101         return len(self._entries)
102
103     def fresh(self):
104         self.inodes.touch(self)
105         super(Directory, self).fresh()
106
107     def merge(self, items, fn, same, new_entry):
108         """Helper method for updating the contents of the directory.
109
110         Takes a list describing the new contents of the directory, reuse
111         entries that are the same in both the old and new lists, create new
112         entries, and delete old entries missing from the new list.
113
114         :items: iterable with new directory contents
115
116         :fn: function to take an entry in 'items' and return the desired file or
117         directory name, or None if this entry should be skipped
118
119         :same: function to compare an existing entry (a File or Directory
120         object) with an entry in the items list to determine whether to keep
121         the existing entry.
122
123         :new_entry: function to create a new directory entry (File or Directory
124         object) from an entry in the items list.
125
126         """
127
128         oldentries = self._entries
129         self._entries = {}
130         changed = False
131         for i in items:
132             name = sanitize_filename(fn(i))
133             if name:
134                 if name in oldentries and same(oldentries[name], i):
135                     # move existing directory entry over
136                     self._entries[name] = oldentries[name]
137                     del oldentries[name]
138                 else:
139                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
140                     # create new directory entry
141                     ent = new_entry(i)
142                     if ent is not None:
143                         self._entries[name] = self.inodes.add_entry(ent)
144                         changed = True
145
146         # delete any other directory entries that were not in found in 'items'
147         for i in oldentries:
148             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
149             self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
150             self.inodes.del_entry(oldentries[i])
151             changed = True
152
153         if changed:
154             self.inodes.invalidate_inode(self.inode)
155             self._mtime = time.time()
156
157         self.fresh()
158
159     def clear(self, force=False):
160         """Delete all entries"""
161
162         if not self.in_use() or force:
163             oldentries = self._entries
164             self._entries = {}
165             for n in oldentries:
166                 if not oldentries[n].clear(force):
167                     self._entries = oldentries
168                     return False
169             for n in oldentries:
170                 self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
171                 self.inodes.del_entry(oldentries[n])
172             self.inodes.invalidate_inode(self.inode)
173             self.invalidate()
174             return True
175         else:
176             return False
177
178     def mtime(self):
179         return self._mtime
180
181     def writable(self):
182         return False
183
184     def flush(self):
185         pass
186
187     def want_event_subscribe(self):
188         raise NotImplementedError()
189
190     def create(self, name):
191         raise NotImplementedError()
192
193     def mkdir(self, name):
194         raise NotImplementedError()
195
196     def unlink(self, name):
197         raise NotImplementedError()
198
199     def rmdir(self, name):
200         raise NotImplementedError()
201
202     def rename(self, name_old, name_new, src):
203         raise NotImplementedError()
204
205
206 class CollectionDirectoryBase(Directory):
207     """Represent an Arvados Collection as a directory.
208
209     This class is used for Subcollections, and is also the base class for
210     CollectionDirectory, which implements collection loading/saving on
211     Collection records.
212
213     Most operations act only the underlying Arvados `Collection` object.  The
214     `Collection` object signals via a notify callback to
215     `CollectionDirectoryBase.on_event` that an item was added, removed or
216     modified.  FUSE inodes and directory entries are created, deleted or
217     invalidated in response to these events.
218
219     """
220
221     def __init__(self, parent_inode, inodes, collection):
222         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
223         self.collection = collection
224
225     def new_entry(self, name, item, mtime):
226         name = sanitize_filename(name)
227         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
228             if item.fuse_entry.dead is not True:
229                 raise Exception("Can only reparent dead inode entry")
230             if item.fuse_entry.inode is None:
231                 raise Exception("Reparented entry must still have valid inode")
232             item.fuse_entry.dead = False
233             self._entries[name] = item.fuse_entry
234         elif isinstance(item, arvados.collection.RichCollectionBase):
235             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
236             self._entries[name].populate(mtime)
237         else:
238             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
239         item.fuse_entry = self._entries[name]
240
241     def on_event(self, event, collection, name, item):
242         if collection == self.collection:
243             name = sanitize_filename(name)
244             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
245             with llfuse.lock:
246                 if event == arvados.collection.ADD:
247                     self.new_entry(name, item, self.mtime())
248                 elif event == arvados.collection.DEL:
249                     ent = self._entries[name]
250                     del self._entries[name]
251                     self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
252                     self.inodes.del_entry(ent)
253                 elif event == arvados.collection.MOD:
254                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
255                         self.inodes.invalidate_inode(item.fuse_entry.inode)
256                     elif name in self._entries:
257                         self.inodes.invalidate_inode(self._entries[name].inode)
258
259     def populate(self, mtime):
260         self._mtime = mtime
261         self.collection.subscribe(self.on_event)
262         for entry, item in self.collection.items():
263             self.new_entry(entry, item, self.mtime())
264
265     def writable(self):
266         return self.collection.writable()
267
268     @use_counter
269     def flush(self):
270         with llfuse.lock_released:
271             self.collection.root_collection().save()
272
273     @use_counter
274     @check_update
275     def create(self, name):
276         with llfuse.lock_released:
277             self.collection.open(name, "w").close()
278
279     @use_counter
280     @check_update
281     def mkdir(self, name):
282         with llfuse.lock_released:
283             self.collection.mkdirs(name)
284
285     @use_counter
286     @check_update
287     def unlink(self, name):
288         with llfuse.lock_released:
289             self.collection.remove(name)
290         self.flush()
291
292     @use_counter
293     @check_update
294     def rmdir(self, name):
295         with llfuse.lock_released:
296             self.collection.remove(name)
297         self.flush()
298
299     @use_counter
300     @check_update
301     def rename(self, name_old, name_new, src):
302         if not isinstance(src, CollectionDirectoryBase):
303             raise llfuse.FUSEError(errno.EPERM)
304
305         if name_new in self:
306             ent = src[name_old]
307             tgt = self[name_new]
308             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
309                 pass
310             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
311                 if len(tgt) > 0:
312                     raise llfuse.FUSEError(errno.ENOTEMPTY)
313             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
314                 raise llfuse.FUSEError(errno.ENOTDIR)
315             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
316                 raise llfuse.FUSEError(errno.EISDIR)
317
318         with llfuse.lock_released:
319             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
320         self.flush()
321         src.flush()
322
323     def clear(self, force=False):
324         super(CollectionDirectoryBase, self).clear(force)
325         self.collection = None
326
327
328 class CollectionDirectory(CollectionDirectoryBase):
329     """Represents the root of a directory tree representing a collection."""
330
331     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
332         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
333         self.api = api
334         self.num_retries = num_retries
335         self.collection_record_file = None
336         self.collection_record = None
337         self._poll = True
338         try:
339             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2)/2)
340         except:
341             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
342             self._poll_time = 60*60
343
344         if isinstance(collection_record, dict):
345             self.collection_locator = collection_record['uuid']
346             self._mtime = convertTime(collection_record.get('modified_at'))
347         else:
348             self.collection_locator = collection_record
349             self._mtime = 0
350         self._manifest_size = 0
351         if self.collection_locator:
352             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
353         self._updating_lock = threading.Lock()
354
355     def same(self, i):
356         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
357
358     def writable(self):
359         return self.collection.writable() if self.collection is not None else self._writable
360
361     def want_event_subscribe(self):
362         return (uuid_pattern.match(self.collection_locator) is not None)
363
364     # Used by arv-web.py to switch the contents of the CollectionDirectory
365     def change_collection(self, new_locator):
366         """Switch the contents of the CollectionDirectory.
367
368         Must be called with llfuse.lock held.
369         """
370
371         self.collection_locator = new_locator
372         self.collection_record = None
373         self.update()
374
375     def new_collection(self, new_collection_record, coll_reader):
376         if self.inode:
377             self.clear(force=True)
378
379         self.collection_record = new_collection_record
380
381         if self.collection_record:
382             self._mtime = convertTime(self.collection_record.get('modified_at'))
383             self.collection_locator = self.collection_record["uuid"]
384             if self.collection_record_file is not None:
385                 self.collection_record_file.update(self.collection_record)
386
387         self.collection = coll_reader
388         self.populate(self.mtime())
389
390     def uuid(self):
391         return self.collection_locator
392
393     @use_counter
394     def update(self, to_record_version=None):
395         try:
396             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
397                 return True
398
399             if self.collection_locator is None:
400                 self.fresh()
401                 return True
402
403             try:
404                 with llfuse.lock_released:
405                     self._updating_lock.acquire()
406                     if not self.stale():
407                         return
408
409                     _logger.debug("Updating %s", to_record_version)
410                     if self.collection is not None:
411                         if self.collection.known_past_version(to_record_version):
412                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
413                         else:
414                             self.collection.update()
415                     else:
416                         if uuid_pattern.match(self.collection_locator):
417                             coll_reader = arvados.collection.Collection(
418                                 self.collection_locator, self.api, self.api.keep,
419                                 num_retries=self.num_retries)
420                         else:
421                             coll_reader = arvados.collection.CollectionReader(
422                                 self.collection_locator, self.api, self.api.keep,
423                                 num_retries=self.num_retries)
424                         new_collection_record = coll_reader.api_response() or {}
425                         # If the Collection only exists in Keep, there will be no API
426                         # response.  Fill in the fields we need.
427                         if 'uuid' not in new_collection_record:
428                             new_collection_record['uuid'] = self.collection_locator
429                         if "portable_data_hash" not in new_collection_record:
430                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
431                         if 'manifest_text' not in new_collection_record:
432                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
433
434                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
435                             self.new_collection(new_collection_record, coll_reader)
436
437                         self._manifest_size = len(coll_reader.manifest_text())
438                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
439                 # end with llfuse.lock_released, re-acquire lock
440
441                 self.fresh()
442                 return True
443             finally:
444                 self._updating_lock.release()
445         except arvados.errors.NotFoundError as e:
446             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
447         except arvados.errors.ArgumentError as detail:
448             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
449             if self.collection_record is not None and "manifest_text" in self.collection_record:
450                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
451         except Exception:
452             _logger.exception("arv-mount %s: error", self.collection_locator)
453             if self.collection_record is not None and "manifest_text" in self.collection_record:
454                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
455         self.invalidate()
456         return False
457
458     @use_counter
459     @check_update
460     def __getitem__(self, item):
461         if item == '.arvados#collection':
462             if self.collection_record_file is None:
463                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
464                 self.inodes.add_entry(self.collection_record_file)
465             return self.collection_record_file
466         else:
467             return super(CollectionDirectory, self).__getitem__(item)
468
469     def __contains__(self, k):
470         if k == '.arvados#collection':
471             return True
472         else:
473             return super(CollectionDirectory, self).__contains__(k)
474
475     def invalidate(self):
476         self.collection_record = None
477         self.collection_record_file = None
478         super(CollectionDirectory, self).invalidate()
479
480     def persisted(self):
481         return (self.collection_locator is not None)
482
483     def objsize(self):
484         # This is an empirically-derived heuristic to estimate the memory used
485         # to store this collection's metadata.  Calculating the memory
486         # footprint directly would be more accurate, but also more complicated.
487         return self._manifest_size * 128
488
489     def finalize(self):
490         if self.collection is not None:
491             if self.writable():
492                 self.collection.save()
493             self.collection.stop_threads()
494
495
496 class TmpCollectionDirectory(CollectionDirectoryBase):
497     """A directory backed by an Arvados collection that never gets saved.
498
499     This supports using Keep as scratch space. A userspace program can
500     read the .arvados#collection file to get a current manifest in
501     order to save a snapshot of the scratch data or use it as a crunch
502     job output.
503     """
504
505     class UnsaveableCollection(arvados.collection.Collection):
506         def save(self):
507             pass
508         def save_new(self):
509             pass
510
511     def __init__(self, parent_inode, inodes, api_client, num_retries):
512         collection = self.UnsaveableCollection(
513             api_client=api_client,
514             keep_client=api_client.keep,
515             num_retries=num_retries)
516         super(TmpCollectionDirectory, self).__init__(
517             parent_inode, inodes, collection)
518         self.collection_record_file = None
519         self.populate(self.mtime())
520
521     def on_event(self, *args, **kwargs):
522         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
523         if self.collection_record_file:
524             with llfuse.lock:
525                 self.collection_record_file.invalidate()
526             self.inodes.invalidate_inode(self.collection_record_file.inode)
527             _logger.debug("%s invalidated collection record", self)
528
529     def collection_record(self):
530         with llfuse.lock_released:
531             return {
532                 "uuid": None,
533                 "manifest_text": self.collection.manifest_text(),
534                 "portable_data_hash": self.collection.portable_data_hash(),
535             }
536
537     def __contains__(self, k):
538         return (k == '.arvados#collection' or
539                 super(TmpCollectionDirectory, self).__contains__(k))
540
541     @use_counter
542     def __getitem__(self, item):
543         if item == '.arvados#collection':
544             if self.collection_record_file is None:
545                 self.collection_record_file = FuncToJSONFile(
546                     self.inode, self.collection_record)
547                 self.inodes.add_entry(self.collection_record_file)
548             return self.collection_record_file
549         return super(TmpCollectionDirectory, self).__getitem__(item)
550
551     def persisted(self):
552         return False
553
554     def writable(self):
555         return True
556
557     def want_event_subscribe(self):
558         return False
559
560     def finalize(self):
561         self.collection.stop_threads()
562
563     def invalidate(self):
564         if self.collection_record_file:
565             self.collection_record_file.invalidate()
566         super(TmpCollectionDirectory, self).invalidate()
567
568
569 class MagicDirectory(Directory):
570     """A special directory that logically contains the set of all extant keep locators.
571
572     When a file is referenced by lookup(), it is tested to see if it is a valid
573     keep locator to a manifest, and if so, loads the manifest contents as a
574     subdirectory of this directory with the locator as the directory name.
575     Since querying a list of all extant keep locators is impractical, only
576     collections that have already been accessed are visible to readdir().
577
578     """
579
580     README_TEXT = """
581 This directory provides access to Arvados collections as subdirectories listed
582 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
583 the form '1234567890abcdef0123456789abcdef+123').
584
585 Note that this directory will appear empty until you attempt to access a
586 specific collection subdirectory (such as trying to 'cd' into it), at which
587 point the collection will actually be looked up on the server and the directory
588 will appear if it exists.
589
590 """.lstrip()
591
592     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
593         super(MagicDirectory, self).__init__(parent_inode, inodes)
594         self.api = api
595         self.num_retries = num_retries
596         self.pdh_only = pdh_only
597
598     def __setattr__(self, name, value):
599         super(MagicDirectory, self).__setattr__(name, value)
600         # When we're assigned an inode, add a README.
601         if ((name == 'inode') and (self.inode is not None) and
602               (not self._entries)):
603             self._entries['README'] = self.inodes.add_entry(
604                 StringFile(self.inode, self.README_TEXT, time.time()))
605             # If we're the root directory, add an identical by_id subdirectory.
606             if self.inode == llfuse.ROOT_INODE:
607                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
608                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
609
610     def __contains__(self, k):
611         if k in self._entries:
612             return True
613
614         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
615             return False
616
617         try:
618             e = self.inodes.add_entry(CollectionDirectory(
619                     self.inode, self.inodes, self.api, self.num_retries, k))
620
621             if e.update():
622                 if k not in self._entries:
623                     self._entries[k] = e
624                 else:
625                     self.inodes.del_entry(e)
626                 return True
627             else:
628                 self.inodes.invalidate_entry(self.inode, k)
629                 self.inodes.del_entry(e)
630                 return False
631         except Exception as ex:
632             _logger.debug('arv-mount exception keep %s', ex)
633             self.inodes.del_entry(e)
634             return False
635
636     def __getitem__(self, item):
637         if item in self:
638             return self._entries[item]
639         else:
640             raise KeyError("No collection with id " + item)
641
642     def clear(self, force=False):
643         pass
644
645     def want_event_subscribe(self):
646         return not self.pdh_only
647
648
649 class RecursiveInvalidateDirectory(Directory):
650     def invalidate(self):
651         try:
652             super(RecursiveInvalidateDirectory, self).invalidate()
653             for a in self._entries:
654                 self._entries[a].invalidate()
655         except Exception:
656             _logger.exception()
657
658
659 class TagsDirectory(RecursiveInvalidateDirectory):
660     """A special directory that contains as subdirectories all tags visible to the user."""
661
662     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
663         super(TagsDirectory, self).__init__(parent_inode, inodes)
664         self.api = api
665         self.num_retries = num_retries
666         self._poll = True
667         self._poll_time = poll_time
668
669     def want_event_subscribe(self):
670         return True
671
672     @use_counter
673     def update(self):
674         with llfuse.lock_released:
675             tags = self.api.links().list(
676                 filters=[['link_class', '=', 'tag']],
677                 select=['name'], distinct=True
678                 ).execute(num_retries=self.num_retries)
679         if "items" in tags:
680             self.merge(tags['items'],
681                        lambda i: i['name'],
682                        lambda a, i: a.tag == i['name'],
683                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
684
685
686 class TagDirectory(Directory):
687     """A special directory that contains as subdirectories all collections visible
688     to the user that are tagged with a particular tag.
689     """
690
691     def __init__(self, parent_inode, inodes, api, num_retries, tag,
692                  poll=False, poll_time=60):
693         super(TagDirectory, self).__init__(parent_inode, inodes)
694         self.api = api
695         self.num_retries = num_retries
696         self.tag = tag
697         self._poll = poll
698         self._poll_time = poll_time
699
700     def want_event_subscribe(self):
701         return True
702
703     @use_counter
704     def update(self):
705         with llfuse.lock_released:
706             taggedcollections = self.api.links().list(
707                 filters=[['link_class', '=', 'tag'],
708                          ['name', '=', self.tag],
709                          ['head_uuid', 'is_a', 'arvados#collection']],
710                 select=['head_uuid']
711                 ).execute(num_retries=self.num_retries)
712         self.merge(taggedcollections['items'],
713                    lambda i: i['head_uuid'],
714                    lambda a, i: a.collection_locator == i['head_uuid'],
715                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
716
717
718 class ProjectDirectory(Directory):
719     """A special directory that contains the contents of a project."""
720
721     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
722                  poll=False, poll_time=60):
723         super(ProjectDirectory, self).__init__(parent_inode, inodes)
724         self.api = api
725         self.num_retries = num_retries
726         self.project_object = project_object
727         self.project_object_file = None
728         self.project_uuid = project_object['uuid']
729         self._poll = poll
730         self._poll_time = poll_time
731         self._updating_lock = threading.Lock()
732         self._current_user = None
733
734     def want_event_subscribe(self):
735         return True
736
737     def createDirectory(self, i):
738         if collection_uuid_pattern.match(i['uuid']):
739             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
740         elif group_uuid_pattern.match(i['uuid']):
741             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
742         elif link_uuid_pattern.match(i['uuid']):
743             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
744                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
745             else:
746                 return None
747         elif uuid_pattern.match(i['uuid']):
748             return ObjectFile(self.parent_inode, i)
749         else:
750             return None
751
752     def uuid(self):
753         return self.project_uuid
754
755     @use_counter
756     def update(self):
757         if self.project_object_file == None:
758             self.project_object_file = ObjectFile(self.inode, self.project_object)
759             self.inodes.add_entry(self.project_object_file)
760
761         def namefn(i):
762             if 'name' in i:
763                 if i['name'] is None or len(i['name']) == 0:
764                     return None
765                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
766                     # collection or subproject
767                     return i['name']
768                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
769                     # name link
770                     return i['name']
771                 elif 'kind' in i and i['kind'].startswith('arvados#'):
772                     # something else
773                     return "{}.{}".format(i['name'], i['kind'][8:])
774             else:
775                 return None
776
777         def samefn(a, i):
778             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
779                 return a.uuid() == i['uuid']
780             elif isinstance(a, ObjectFile):
781                 return a.uuid() == i['uuid'] and not a.stale()
782             return False
783
784         try:
785             with llfuse.lock_released:
786                 self._updating_lock.acquire()
787                 if not self.stale():
788                     return
789
790                 if group_uuid_pattern.match(self.project_uuid):
791                     self.project_object = self.api.groups().get(
792                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
793                 elif user_uuid_pattern.match(self.project_uuid):
794                     self.project_object = self.api.users().get(
795                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
796
797                 contents = arvados.util.list_all(self.api.groups().contents,
798                                                  self.num_retries, uuid=self.project_uuid)
799
800             # end with llfuse.lock_released, re-acquire lock
801
802             self.merge(contents,
803                        namefn,
804                        samefn,
805                        self.createDirectory)
806         finally:
807             self._updating_lock.release()
808
809     @use_counter
810     @check_update
811     def __getitem__(self, item):
812         if item == '.arvados#project':
813             return self.project_object_file
814         else:
815             return super(ProjectDirectory, self).__getitem__(item)
816
817     def __contains__(self, k):
818         if k == '.arvados#project':
819             return True
820         else:
821             return super(ProjectDirectory, self).__contains__(k)
822
823     @use_counter
824     @check_update
825     def writable(self):
826         with llfuse.lock_released:
827             if not self._current_user:
828                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
829             return self._current_user["uuid"] in self.project_object["writable_by"]
830
831     def persisted(self):
832         return True
833
834     @use_counter
835     @check_update
836     def mkdir(self, name):
837         try:
838             with llfuse.lock_released:
839                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
840                                                     "name": name,
841                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
842             self.invalidate()
843         except apiclient_errors.Error as error:
844             _logger.error(error)
845             raise llfuse.FUSEError(errno.EEXIST)
846
847     @use_counter
848     @check_update
849     def rmdir(self, name):
850         if name not in self:
851             raise llfuse.FUSEError(errno.ENOENT)
852         if not isinstance(self[name], CollectionDirectory):
853             raise llfuse.FUSEError(errno.EPERM)
854         if len(self[name]) > 0:
855             raise llfuse.FUSEError(errno.ENOTEMPTY)
856         with llfuse.lock_released:
857             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
858         self.invalidate()
859
860     @use_counter
861     @check_update
862     def rename(self, name_old, name_new, src):
863         if not isinstance(src, ProjectDirectory):
864             raise llfuse.FUSEError(errno.EPERM)
865
866         ent = src[name_old]
867
868         if not isinstance(ent, CollectionDirectory):
869             raise llfuse.FUSEError(errno.EPERM)
870
871         if name_new in self:
872             # POSIX semantics for replacing one directory with another is
873             # tricky (the target directory must be empty, the operation must be
874             # atomic which isn't possible with the Arvados API as of this
875             # writing) so don't support that.
876             raise llfuse.FUSEError(errno.EPERM)
877
878         self.api.collections().update(uuid=ent.uuid(),
879                                       body={"owner_uuid": self.uuid(),
880                                             "name": name_new}).execute(num_retries=self.num_retries)
881
882         # Acually move the entry from source directory to this directory.
883         del src._entries[name_old]
884         self._entries[name_new] = ent
885         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
886
887
888 class SharedDirectory(Directory):
889     """A special directory that represents users or groups who have shared projects with me."""
890
891     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
892                  poll=False, poll_time=60):
893         super(SharedDirectory, self).__init__(parent_inode, inodes)
894         self.api = api
895         self.num_retries = num_retries
896         self.current_user = api.users().current().execute(num_retries=num_retries)
897         self._poll = True
898         self._poll_time = poll_time
899
900     @use_counter
901     def update(self):
902         with llfuse.lock_released:
903             all_projects = arvados.util.list_all(
904                 self.api.groups().list, self.num_retries,
905                 filters=[['group_class','=','project']])
906             objects = {}
907             for ob in all_projects:
908                 objects[ob['uuid']] = ob
909
910             roots = []
911             root_owners = {}
912             for ob in all_projects:
913                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
914                     roots.append(ob)
915                     root_owners[ob['owner_uuid']] = True
916
917             lusers = arvados.util.list_all(
918                 self.api.users().list, self.num_retries,
919                 filters=[['uuid','in', list(root_owners)]])
920             lgroups = arvados.util.list_all(
921                 self.api.groups().list, self.num_retries,
922                 filters=[['uuid','in', list(root_owners)]])
923
924             users = {}
925             groups = {}
926
927             for l in lusers:
928                 objects[l["uuid"]] = l
929             for l in lgroups:
930                 objects[l["uuid"]] = l
931
932             contents = {}
933             for r in root_owners:
934                 if r in objects:
935                     obr = objects[r]
936                     if obr.get("name"):
937                         contents[obr["name"]] = obr
938                     #elif obr.get("username"):
939                     #    contents[obr["username"]] = obr
940                     elif "first_name" in obr:
941                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
942
943
944             for r in roots:
945                 if r['owner_uuid'] not in objects:
946                     contents[r['name']] = r
947
948         # end with llfuse.lock_released, re-acquire lock
949
950         try:
951             self.merge(contents.items(),
952                        lambda i: i[0],
953                        lambda a, i: a.uuid() == i[1]['uuid'],
954                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
955         except Exception:
956             _logger.exception()
957
958     def want_event_subscribe(self):
959         return True