12276: Reduce number of spurious invalidations sent to kernel.
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import logging
6 import re
7 import time
8 import llfuse
9 import arvados
10 import apiclient
11 import functools
12 import threading
13 from apiclient import errors as apiclient_errors
14 import errno
15 import time
16
17 from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from fresh import FreshBase, convertTime, use_counter, check_update
19
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
22
23 _logger = logging.getLogger('arvados.arvados_fuse')
24
25
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile('[\x00/]')
30
31 # '.' and '..' are not reachable if API server is newer than #6277
32 def sanitize_filename(dirty):
33     """Replace disallowed filename characters with harmless "_"."""
34     if dirty is None:
35         return None
36     elif dirty == '':
37         return '_'
38     elif dirty == '.':
39         return '_'
40     elif dirty == '..':
41         return '__'
42     else:
43         return _disallowed_filename_characters.sub('_', dirty)
44
45
46 class Directory(FreshBase):
47     """Generic directory object, backed by a dict.
48
49     Consists of a set of entries with the key representing the filename
50     and the value referencing a File or Directory object.
51     """
52
53     def __init__(self, parent_inode, inodes):
54         """parent_inode is the integer inode number"""
55
56         super(Directory, self).__init__()
57
58         self.inode = None
59         if not isinstance(parent_inode, int):
60             raise Exception("parent_inode should be an int")
61         self.parent_inode = parent_inode
62         self.inodes = inodes
63         self._entries = {}
64         self._mtime = time.time()
65
66     #  Overriden by subclasses to implement logic to update the entries dict
67     #  when the directory is stale
68     @use_counter
69     def update(self):
70         pass
71
72     # Only used when computing the size of the disk footprint of the directory
73     # (stub)
74     def size(self):
75         return 0
76
77     def persisted(self):
78         return False
79
80     def checkupdate(self):
81         if self.stale():
82             try:
83                 self.update()
84             except apiclient.errors.HttpError as e:
85                 _logger.warn(e)
86
87     @use_counter
88     @check_update
89     def __getitem__(self, item):
90         return self._entries[item]
91
92     @use_counter
93     @check_update
94     def items(self):
95         return list(self._entries.items())
96
97     @use_counter
98     @check_update
99     def __contains__(self, k):
100         return k in self._entries
101
102     @use_counter
103     @check_update
104     def __len__(self):
105         return len(self._entries)
106
107     def fresh(self):
108         self.inodes.touch(self)
109         super(Directory, self).fresh()
110
111     def merge(self, items, fn, same, new_entry):
112         """Helper method for updating the contents of the directory.
113
114         Takes a list describing the new contents of the directory, reuse
115         entries that are the same in both the old and new lists, create new
116         entries, and delete old entries missing from the new list.
117
118         :items: iterable with new directory contents
119
120         :fn: function to take an entry in 'items' and return the desired file or
121         directory name, or None if this entry should be skipped
122
123         :same: function to compare an existing entry (a File or Directory
124         object) with an entry in the items list to determine whether to keep
125         the existing entry.
126
127         :new_entry: function to create a new directory entry (File or Directory
128         object) from an entry in the items list.
129
130         """
131
132         oldentries = self._entries
133         self._entries = {}
134         changed = False
135         for i in items:
136             name = sanitize_filename(fn(i))
137             if name:
138                 if name in oldentries and same(oldentries[name], i):
139                     # move existing directory entry over
140                     self._entries[name] = oldentries[name]
141                     del oldentries[name]
142                 else:
143                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
144                     # create new directory entry
145                     ent = new_entry(i)
146                     if ent is not None:
147                         self._entries[name] = self.inodes.add_entry(ent)
148                         changed = True
149
150         # delete any other directory entries that were not in found in 'items'
151         for i in oldentries:
152             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
153             self.inodes.invalidate_entry(self, i)
154             self.inodes.del_entry(oldentries[i])
155             changed = True
156
157         if changed:
158             self.inodes.invalidate_inode(self)
159             self._mtime = time.time()
160
161         self.fresh()
162
163     def in_use(self):
164         if super(Directory, self).in_use():
165             return True
166         for v in self._entries.itervalues():
167             if v.in_use():
168                 return True
169         return False
170
171     def has_ref(self, only_children):
172         if super(Directory, self).has_ref(only_children):
173             return True
174         for v in self._entries.itervalues():
175             if v.has_ref(False):
176                 return True
177         return False
178
179     def clear(self):
180         """Delete all entries"""
181         oldentries = self._entries
182         self._entries = {}
183         for n in oldentries:
184             oldentries[n].clear()
185             self.inodes.del_entry(oldentries[n])
186         self.invalidate()
187
188     def kernel_invalidate(self):
189         # Invalidating the dentry on the parent implies invalidating all paths
190         # below it as well.
191         parent = self.inodes[self.parent_inode]
192
193         # Find self on the parent in order to invalidate this path.
194         # Calling the public items() method might trigger a refresh,
195         # which we definitely don't want, so read the internal dict directly.
196         for k,v in parent._entries.items():
197             _logger.debug("looking at %s %s, self is %s", k, v, self)
198             if v is self:
199                 self.inodes.invalidate_entry(parent, k)
200                 break
201
202     def mtime(self):
203         return self._mtime
204
205     def writable(self):
206         return False
207
208     def flush(self):
209         pass
210
211     def want_event_subscribe(self):
212         raise NotImplementedError()
213
214     def create(self, name):
215         raise NotImplementedError()
216
217     def mkdir(self, name):
218         raise NotImplementedError()
219
220     def unlink(self, name):
221         raise NotImplementedError()
222
223     def rmdir(self, name):
224         raise NotImplementedError()
225
226     def rename(self, name_old, name_new, src):
227         raise NotImplementedError()
228
229
230 class CollectionDirectoryBase(Directory):
231     """Represent an Arvados Collection as a directory.
232
233     This class is used for Subcollections, and is also the base class for
234     CollectionDirectory, which implements collection loading/saving on
235     Collection records.
236
237     Most operations act only the underlying Arvados `Collection` object.  The
238     `Collection` object signals via a notify callback to
239     `CollectionDirectoryBase.on_event` that an item was added, removed or
240     modified.  FUSE inodes and directory entries are created, deleted or
241     invalidated in response to these events.
242
243     """
244
245     def __init__(self, parent_inode, inodes, collection):
246         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
247         self.collection = collection
248
249     def new_entry(self, name, item, mtime):
250         name = sanitize_filename(name)
251         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
252             if item.fuse_entry.dead is not True:
253                 raise Exception("Can only reparent dead inode entry")
254             if item.fuse_entry.inode is None:
255                 raise Exception("Reparented entry must still have valid inode")
256             item.fuse_entry.dead = False
257             self._entries[name] = item.fuse_entry
258         elif isinstance(item, arvados.collection.RichCollectionBase):
259             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
260             self._entries[name].populate(mtime)
261         else:
262             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
263         item.fuse_entry = self._entries[name]
264
265     def on_event(self, event, collection, name, item):
266         if collection == self.collection:
267             name = sanitize_filename(name)
268             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
269             with llfuse.lock:
270                 if event == arvados.collection.ADD:
271                     self.new_entry(name, item, self.mtime())
272                 elif event == arvados.collection.DEL:
273                     ent = self._entries[name]
274                     del self._entries[name]
275                     self.inodes.invalidate_entry(self, name)
276                     self.inodes.del_entry(ent)
277                 elif event == arvados.collection.MOD:
278                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
279                         self.inodes.invalidate_inode(item.fuse_entry)
280                     elif name in self._entries:
281                         self.inodes.invalidate_inode(self._entries[name])
282
283     def populate(self, mtime):
284         self._mtime = mtime
285         self.collection.subscribe(self.on_event)
286         for entry, item in self.collection.items():
287             self.new_entry(entry, item, self.mtime())
288
289     def writable(self):
290         return self.collection.writable()
291
292     @use_counter
293     def flush(self):
294         with llfuse.lock_released:
295             self.collection.root_collection().save()
296
297     @use_counter
298     @check_update
299     def create(self, name):
300         with llfuse.lock_released:
301             self.collection.open(name, "w").close()
302
303     @use_counter
304     @check_update
305     def mkdir(self, name):
306         with llfuse.lock_released:
307             self.collection.mkdirs(name)
308
309     @use_counter
310     @check_update
311     def unlink(self, name):
312         with llfuse.lock_released:
313             self.collection.remove(name)
314         self.flush()
315
316     @use_counter
317     @check_update
318     def rmdir(self, name):
319         with llfuse.lock_released:
320             self.collection.remove(name)
321         self.flush()
322
323     @use_counter
324     @check_update
325     def rename(self, name_old, name_new, src):
326         if not isinstance(src, CollectionDirectoryBase):
327             raise llfuse.FUSEError(errno.EPERM)
328
329         if name_new in self:
330             ent = src[name_old]
331             tgt = self[name_new]
332             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
333                 pass
334             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
335                 if len(tgt) > 0:
336                     raise llfuse.FUSEError(errno.ENOTEMPTY)
337             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
338                 raise llfuse.FUSEError(errno.ENOTDIR)
339             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
340                 raise llfuse.FUSEError(errno.EISDIR)
341
342         with llfuse.lock_released:
343             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
344         self.flush()
345         src.flush()
346
347     def clear(self):
348         super(CollectionDirectoryBase, self).clear()
349         self.collection = None
350
351
352 class CollectionDirectory(CollectionDirectoryBase):
353     """Represents the root of a directory tree representing a collection."""
354
355     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
356         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
357         self.api = api
358         self.num_retries = num_retries
359         self.collection_record_file = None
360         self.collection_record = None
361         self._poll = True
362         try:
363             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2)/2)
364         except:
365             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
366             self._poll_time = 60*60
367
368         if isinstance(collection_record, dict):
369             self.collection_locator = collection_record['uuid']
370             self._mtime = convertTime(collection_record.get('modified_at'))
371         else:
372             self.collection_locator = collection_record
373             self._mtime = 0
374         self._manifest_size = 0
375         if self.collection_locator:
376             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
377         self._updating_lock = threading.Lock()
378
379     def same(self, i):
380         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
381
382     def writable(self):
383         return self.collection.writable() if self.collection is not None else self._writable
384
385     def want_event_subscribe(self):
386         return (uuid_pattern.match(self.collection_locator) is not None)
387
388     # Used by arv-web.py to switch the contents of the CollectionDirectory
389     def change_collection(self, new_locator):
390         """Switch the contents of the CollectionDirectory.
391
392         Must be called with llfuse.lock held.
393         """
394
395         self.collection_locator = new_locator
396         self.collection_record = None
397         self.update()
398
399     def new_collection(self, new_collection_record, coll_reader):
400         if self.inode:
401             self.clear()
402
403         self.collection_record = new_collection_record
404
405         if self.collection_record:
406             self._mtime = convertTime(self.collection_record.get('modified_at'))
407             self.collection_locator = self.collection_record["uuid"]
408             if self.collection_record_file is not None:
409                 self.collection_record_file.update(self.collection_record)
410
411         self.collection = coll_reader
412         self.populate(self.mtime())
413
414     def uuid(self):
415         return self.collection_locator
416
417     @use_counter
418     def update(self, to_record_version=None):
419         try:
420             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
421                 return True
422
423             if self.collection_locator is None:
424                 self.fresh()
425                 return True
426
427             try:
428                 with llfuse.lock_released:
429                     self._updating_lock.acquire()
430                     if not self.stale():
431                         return
432
433                     _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
434                     if self.collection is not None:
435                         if self.collection.known_past_version(to_record_version):
436                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
437                         else:
438                             self.collection.update()
439                     else:
440                         if uuid_pattern.match(self.collection_locator):
441                             coll_reader = arvados.collection.Collection(
442                                 self.collection_locator, self.api, self.api.keep,
443                                 num_retries=self.num_retries)
444                         else:
445                             coll_reader = arvados.collection.CollectionReader(
446                                 self.collection_locator, self.api, self.api.keep,
447                                 num_retries=self.num_retries)
448                         new_collection_record = coll_reader.api_response() or {}
449                         # If the Collection only exists in Keep, there will be no API
450                         # response.  Fill in the fields we need.
451                         if 'uuid' not in new_collection_record:
452                             new_collection_record['uuid'] = self.collection_locator
453                         if "portable_data_hash" not in new_collection_record:
454                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
455                         if 'manifest_text' not in new_collection_record:
456                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
457
458                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
459                             self.new_collection(new_collection_record, coll_reader)
460
461                         self._manifest_size = len(coll_reader.manifest_text())
462                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
463                 # end with llfuse.lock_released, re-acquire lock
464
465                 self.fresh()
466                 return True
467             finally:
468                 self._updating_lock.release()
469         except arvados.errors.NotFoundError as e:
470             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
471         except arvados.errors.ArgumentError as detail:
472             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
473             if self.collection_record is not None and "manifest_text" in self.collection_record:
474                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
475         except Exception:
476             _logger.exception("arv-mount %s: error", self.collection_locator)
477             if self.collection_record is not None and "manifest_text" in self.collection_record:
478                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
479         self.invalidate()
480         return False
481
482     @use_counter
483     @check_update
484     def __getitem__(self, item):
485         if item == '.arvados#collection':
486             if self.collection_record_file is None:
487                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
488                 self.inodes.add_entry(self.collection_record_file)
489             return self.collection_record_file
490         else:
491             return super(CollectionDirectory, self).__getitem__(item)
492
493     def __contains__(self, k):
494         if k == '.arvados#collection':
495             return True
496         else:
497             return super(CollectionDirectory, self).__contains__(k)
498
499     def invalidate(self):
500         self.collection_record = None
501         self.collection_record_file = None
502         super(CollectionDirectory, self).invalidate()
503
504     def persisted(self):
505         return (self.collection_locator is not None)
506
507     def objsize(self):
508         # This is an empirically-derived heuristic to estimate the memory used
509         # to store this collection's metadata.  Calculating the memory
510         # footprint directly would be more accurate, but also more complicated.
511         return self._manifest_size * 128
512
513     def finalize(self):
514         if self.collection is not None:
515             if self.writable():
516                 self.collection.save()
517             self.collection.stop_threads()
518
519     def clear(self):
520         if self.collection is not None:
521             self.collection.stop_threads()
522         super(CollectionDirectory, self).clear()
523         self._manifest_size = 0
524
525
526 class TmpCollectionDirectory(CollectionDirectoryBase):
527     """A directory backed by an Arvados collection that never gets saved.
528
529     This supports using Keep as scratch space. A userspace program can
530     read the .arvados#collection file to get a current manifest in
531     order to save a snapshot of the scratch data or use it as a crunch
532     job output.
533     """
534
535     class UnsaveableCollection(arvados.collection.Collection):
536         def save(self):
537             pass
538         def save_new(self):
539             pass
540
541     def __init__(self, parent_inode, inodes, api_client, num_retries):
542         collection = self.UnsaveableCollection(
543             api_client=api_client,
544             keep_client=api_client.keep,
545             num_retries=num_retries)
546         super(TmpCollectionDirectory, self).__init__(
547             parent_inode, inodes, collection)
548         self.collection_record_file = None
549         self.populate(self.mtime())
550
551     def on_event(self, *args, **kwargs):
552         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
553         if self.collection_record_file:
554             with llfuse.lock:
555                 self.collection_record_file.invalidate()
556             self.inodes.invalidate_inode(self.collection_record_file)
557             _logger.debug("%s invalidated collection record", self)
558
559     def collection_record(self):
560         with llfuse.lock_released:
561             return {
562                 "uuid": None,
563                 "manifest_text": self.collection.manifest_text(),
564                 "portable_data_hash": self.collection.portable_data_hash(),
565             }
566
567     def __contains__(self, k):
568         return (k == '.arvados#collection' or
569                 super(TmpCollectionDirectory, self).__contains__(k))
570
571     @use_counter
572     def __getitem__(self, item):
573         if item == '.arvados#collection':
574             if self.collection_record_file is None:
575                 self.collection_record_file = FuncToJSONFile(
576                     self.inode, self.collection_record)
577                 self.inodes.add_entry(self.collection_record_file)
578             return self.collection_record_file
579         return super(TmpCollectionDirectory, self).__getitem__(item)
580
581     def persisted(self):
582         return False
583
584     def writable(self):
585         return True
586
587     def want_event_subscribe(self):
588         return False
589
590     def finalize(self):
591         self.collection.stop_threads()
592
593     def invalidate(self):
594         if self.collection_record_file:
595             self.collection_record_file.invalidate()
596         super(TmpCollectionDirectory, self).invalidate()
597
598
599 class MagicDirectory(Directory):
600     """A special directory that logically contains the set of all extant keep locators.
601
602     When a file is referenced by lookup(), it is tested to see if it is a valid
603     keep locator to a manifest, and if so, loads the manifest contents as a
604     subdirectory of this directory with the locator as the directory name.
605     Since querying a list of all extant keep locators is impractical, only
606     collections that have already been accessed are visible to readdir().
607
608     """
609
610     README_TEXT = """
611 This directory provides access to Arvados collections as subdirectories listed
612 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
613 the form '1234567890abcdef0123456789abcdef+123').
614
615 Note that this directory will appear empty until you attempt to access a
616 specific collection subdirectory (such as trying to 'cd' into it), at which
617 point the collection will actually be looked up on the server and the directory
618 will appear if it exists.
619
620 """.lstrip()
621
622     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
623         super(MagicDirectory, self).__init__(parent_inode, inodes)
624         self.api = api
625         self.num_retries = num_retries
626         self.pdh_only = pdh_only
627
628     def __setattr__(self, name, value):
629         super(MagicDirectory, self).__setattr__(name, value)
630         # When we're assigned an inode, add a README.
631         if ((name == 'inode') and (self.inode is not None) and
632               (not self._entries)):
633             self._entries['README'] = self.inodes.add_entry(
634                 StringFile(self.inode, self.README_TEXT, time.time()))
635             # If we're the root directory, add an identical by_id subdirectory.
636             if self.inode == llfuse.ROOT_INODE:
637                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
638                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
639
640     def __contains__(self, k):
641         if k in self._entries:
642             return True
643
644         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
645             return False
646
647         try:
648             e = None
649             e = self.inodes.add_entry(CollectionDirectory(
650                     self.inode, self.inodes, self.api, self.num_retries, k))
651
652             if e.update():
653                 if k not in self._entries:
654                     self._entries[k] = e
655                 else:
656                     self.inodes.del_entry(e)
657                 return True
658             else:
659                 self.inodes.invalidate_entry(self, k)
660                 self.inodes.del_entry(e)
661                 return False
662         except Exception as ex:
663             _logger.exception("arv-mount lookup '%s':", k)
664             if e is not None:
665                 self.inodes.del_entry(e)
666             return False
667
668     def __getitem__(self, item):
669         if item in self:
670             return self._entries[item]
671         else:
672             raise KeyError("No collection with id " + item)
673
674     def clear(self):
675         pass
676
677     def want_event_subscribe(self):
678         return not self.pdh_only
679
680
681 class TagsDirectory(Directory):
682     """A special directory that contains as subdirectories all tags visible to the user."""
683
684     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
685         super(TagsDirectory, self).__init__(parent_inode, inodes)
686         self.api = api
687         self.num_retries = num_retries
688         self._poll = True
689         self._poll_time = poll_time
690         self._extra = set()
691
692     def want_event_subscribe(self):
693         return True
694
695     @use_counter
696     def update(self):
697         with llfuse.lock_released:
698             tags = self.api.links().list(
699                 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
700                 select=['name'], distinct=True, limit=1000
701                 ).execute(num_retries=self.num_retries)
702         if "items" in tags:
703             self.merge(tags['items']+[{"name": n} for n in self._extra],
704                        lambda i: i['name'],
705                        lambda a, i: a.tag == i['name'],
706                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
707
708     @use_counter
709     @check_update
710     def __getitem__(self, item):
711         if super(TagsDirectory, self).__contains__(item):
712             return super(TagsDirectory, self).__getitem__(item)
713         with llfuse.lock_released:
714             tags = self.api.links().list(
715                 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
716             ).execute(num_retries=self.num_retries)
717         if tags["items"]:
718             self._extra.add(item)
719             self.update()
720         return super(TagsDirectory, self).__getitem__(item)
721
722     @use_counter
723     @check_update
724     def __contains__(self, k):
725         if super(TagsDirectory, self).__contains__(k):
726             return True
727         try:
728             self[k]
729             return True
730         except KeyError:
731             pass
732         return False
733
734
735 class TagDirectory(Directory):
736     """A special directory that contains as subdirectories all collections visible
737     to the user that are tagged with a particular tag.
738     """
739
740     def __init__(self, parent_inode, inodes, api, num_retries, tag,
741                  poll=False, poll_time=60):
742         super(TagDirectory, self).__init__(parent_inode, inodes)
743         self.api = api
744         self.num_retries = num_retries
745         self.tag = tag
746         self._poll = poll
747         self._poll_time = poll_time
748
749     def want_event_subscribe(self):
750         return True
751
752     @use_counter
753     def update(self):
754         with llfuse.lock_released:
755             taggedcollections = self.api.links().list(
756                 filters=[['link_class', '=', 'tag'],
757                          ['name', '=', self.tag],
758                          ['head_uuid', 'is_a', 'arvados#collection']],
759                 select=['head_uuid']
760                 ).execute(num_retries=self.num_retries)
761         self.merge(taggedcollections['items'],
762                    lambda i: i['head_uuid'],
763                    lambda a, i: a.collection_locator == i['head_uuid'],
764                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
765
766
767 class ProjectDirectory(Directory):
768     """A special directory that contains the contents of a project."""
769
770     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
771                  poll=False, poll_time=60):
772         super(ProjectDirectory, self).__init__(parent_inode, inodes)
773         self.api = api
774         self.num_retries = num_retries
775         self.project_object = project_object
776         self.project_object_file = None
777         self.project_uuid = project_object['uuid']
778         self._poll = poll
779         self._poll_time = poll_time
780         self._updating_lock = threading.Lock()
781         self._current_user = None
782         self._full_listing = False
783
784     def want_event_subscribe(self):
785         return True
786
787     def createDirectory(self, i):
788         if collection_uuid_pattern.match(i['uuid']):
789             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
790         elif group_uuid_pattern.match(i['uuid']):
791             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
792         elif link_uuid_pattern.match(i['uuid']):
793             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
794                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
795             else:
796                 return None
797         elif uuid_pattern.match(i['uuid']):
798             return ObjectFile(self.parent_inode, i)
799         else:
800             return None
801
802     def uuid(self):
803         return self.project_uuid
804
805     def items(self):
806         self._full_listing = True
807         return super(ProjectDirectory, self).items()
808
809     def namefn(self, i):
810         if 'name' in i:
811             if i['name'] is None or len(i['name']) == 0:
812                 return None
813             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
814                 # collection or subproject
815                 return i['name']
816             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
817                 # name link
818                 return i['name']
819             elif 'kind' in i and i['kind'].startswith('arvados#'):
820                 # something else
821                 return "{}.{}".format(i['name'], i['kind'][8:])
822         else:
823             return None
824
825
826     @use_counter
827     def update(self):
828         if self.project_object_file == None:
829             self.project_object_file = ObjectFile(self.inode, self.project_object)
830             self.inodes.add_entry(self.project_object_file)
831
832         if not self._full_listing:
833             return
834
835         def samefn(a, i):
836             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
837                 return a.uuid() == i['uuid']
838             elif isinstance(a, ObjectFile):
839                 return a.uuid() == i['uuid'] and not a.stale()
840             return False
841
842         try:
843             with llfuse.lock_released:
844                 self._updating_lock.acquire()
845                 if not self.stale():
846                     return
847
848                 if group_uuid_pattern.match(self.project_uuid):
849                     self.project_object = self.api.groups().get(
850                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
851                 elif user_uuid_pattern.match(self.project_uuid):
852                     self.project_object = self.api.users().get(
853                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
854
855                 contents = arvados.util.list_all(self.api.groups().list,
856                                                  self.num_retries,
857                                                  filters=[["owner_uuid", "=", self.project_uuid],
858                                                           ["group_class", "=", "project"]])
859                 contents.extend(arvados.util.list_all(self.api.collections().list,
860                                                       self.num_retries,
861                                                       filters=[["owner_uuid", "=", self.project_uuid]]))
862
863             # end with llfuse.lock_released, re-acquire lock
864
865             self.merge(contents,
866                        self.namefn,
867                        samefn,
868                        self.createDirectory)
869         finally:
870             self._updating_lock.release()
871
872     def _add_entry(self, i, name):
873         ent = self.createDirectory(i)
874         self._entries[name] = self.inodes.add_entry(ent)
875         return self._entries[name]
876
877     @use_counter
878     @check_update
879     def __getitem__(self, k):
880         if k == '.arvados#project':
881             return self.project_object_file
882         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
883             return super(ProjectDirectory, self).__getitem__(k)
884         with llfuse.lock_released:
885             contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
886                                                        ["group_class", "=", "project"],
887                                                        ["name", "=", k]],
888                                               limit=1).execute(num_retries=self.num_retries)["items"]
889             if not contents:
890                 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
891                                                                 ["name", "=", k]],
892                                                        limit=1).execute(num_retries=self.num_retries)["items"]
893         if contents:
894             name = sanitize_filename(self.namefn(contents[0]))
895             if name != k:
896                 raise KeyError(k)
897             return self._add_entry(contents[0], name)
898
899         # Didn't find item
900         raise KeyError(k)
901
902     def __contains__(self, k):
903         if k == '.arvados#project':
904             return True
905         try:
906             self[k]
907             return True
908         except KeyError:
909             pass
910         return False
911
912     @use_counter
913     @check_update
914     def writable(self):
915         with llfuse.lock_released:
916             if not self._current_user:
917                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
918             return self._current_user["uuid"] in self.project_object["writable_by"]
919
920     def persisted(self):
921         return True
922
923     @use_counter
924     @check_update
925     def mkdir(self, name):
926         try:
927             with llfuse.lock_released:
928                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
929                                                     "name": name,
930                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
931             self.invalidate()
932         except apiclient_errors.Error as error:
933             _logger.error(error)
934             raise llfuse.FUSEError(errno.EEXIST)
935
936     @use_counter
937     @check_update
938     def rmdir(self, name):
939         if name not in self:
940             raise llfuse.FUSEError(errno.ENOENT)
941         if not isinstance(self[name], CollectionDirectory):
942             raise llfuse.FUSEError(errno.EPERM)
943         if len(self[name]) > 0:
944             raise llfuse.FUSEError(errno.ENOTEMPTY)
945         with llfuse.lock_released:
946             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
947         self.invalidate()
948
949     @use_counter
950     @check_update
951     def rename(self, name_old, name_new, src):
952         if not isinstance(src, ProjectDirectory):
953             raise llfuse.FUSEError(errno.EPERM)
954
955         ent = src[name_old]
956
957         if not isinstance(ent, CollectionDirectory):
958             raise llfuse.FUSEError(errno.EPERM)
959
960         if name_new in self:
961             # POSIX semantics for replacing one directory with another is
962             # tricky (the target directory must be empty, the operation must be
963             # atomic which isn't possible with the Arvados API as of this
964             # writing) so don't support that.
965             raise llfuse.FUSEError(errno.EPERM)
966
967         self.api.collections().update(uuid=ent.uuid(),
968                                       body={"owner_uuid": self.uuid(),
969                                             "name": name_new}).execute(num_retries=self.num_retries)
970
971         # Acually move the entry from source directory to this directory.
972         del src._entries[name_old]
973         self._entries[name_new] = ent
974         self.inodes.invalidate_entry(src, name_old)
975
976     @use_counter
977     def child_event(self, ev):
978         properties = ev.get("properties") or {}
979         old_attrs = properties.get("old_attributes") or {}
980         new_attrs = properties.get("new_attributes") or {}
981         old_attrs["uuid"] = ev["object_uuid"]
982         new_attrs["uuid"] = ev["object_uuid"]
983         old_name = sanitize_filename(self.namefn(old_attrs))
984         new_name = sanitize_filename(self.namefn(new_attrs))
985
986         # create events will have a new name, but not an old name
987         # delete events will have an old name, but not a new name
988         # update events will have an old and new name, and they may be same or different
989         # if they are the same, an unrelated field changed and there is nothing to do.
990
991         if old_attrs.get("owner_uuid") != self.project_uuid:
992             # Was moved from somewhere else, so don't try to remove entry.
993             old_name = None
994         if ev.get("object_owner_uuid") != self.project_uuid:
995             # Was moved to somewhere else, so don't try to add entry
996             new_name = None
997
998         if ev.get("object_kind") == "arvados#collection":
999             if old_attrs.get("is_trashed"):
1000                 # Was previously deleted
1001                 old_name = None
1002             if new_attrs.get("is_trashed"):
1003                 # Has been deleted
1004                 new_name = None
1005
1006         if new_name != old_name:
1007             ent = None
1008             if old_name in self._entries:
1009                 ent = self._entries[old_name]
1010                 del self._entries[old_name]
1011                 self.inodes.invalidate_entry(self, old_name)
1012
1013             if new_name:
1014                 if ent is not None:
1015                     self._entries[new_name] = ent
1016                 else:
1017                     self._add_entry(new_attrs, new_name)
1018             elif ent is not None:
1019                 self.inodes.del_entry(ent)
1020
1021
1022 class SharedDirectory(Directory):
1023     """A special directory that represents users or groups who have shared projects with me."""
1024
1025     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
1026                  poll=False, poll_time=60):
1027         super(SharedDirectory, self).__init__(parent_inode, inodes)
1028         self.api = api
1029         self.num_retries = num_retries
1030         self.current_user = api.users().current().execute(num_retries=num_retries)
1031         self._poll = True
1032         self._poll_time = poll_time
1033
1034     @use_counter
1035     def update(self):
1036         with llfuse.lock_released:
1037             all_projects = arvados.util.list_all(
1038                 self.api.groups().list, self.num_retries,
1039                 filters=[['group_class','=','project']])
1040             objects = {}
1041             for ob in all_projects:
1042                 objects[ob['uuid']] = ob
1043
1044             roots = []
1045             root_owners = {}
1046             for ob in all_projects:
1047                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
1048                     roots.append(ob)
1049                     root_owners[ob['owner_uuid']] = True
1050
1051             lusers = arvados.util.list_all(
1052                 self.api.users().list, self.num_retries,
1053                 filters=[['uuid','in', list(root_owners)]])
1054             lgroups = arvados.util.list_all(
1055                 self.api.groups().list, self.num_retries,
1056                 filters=[['uuid','in', list(root_owners)]])
1057
1058             users = {}
1059             groups = {}
1060
1061             for l in lusers:
1062                 objects[l["uuid"]] = l
1063             for l in lgroups:
1064                 objects[l["uuid"]] = l
1065
1066             contents = {}
1067             for r in root_owners:
1068                 if r in objects:
1069                     obr = objects[r]
1070                     if obr.get("name"):
1071                         contents[obr["name"]] = obr
1072                     #elif obr.get("username"):
1073                     #    contents[obr["username"]] = obr
1074                     elif "first_name" in obr:
1075                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1076
1077
1078             for r in roots:
1079                 if r['owner_uuid'] not in objects:
1080                     contents[r['name']] = r
1081
1082         # end with llfuse.lock_released, re-acquire lock
1083
1084         try:
1085             self.merge(contents.items(),
1086                        lambda i: i[0],
1087                        lambda a, i: a.uuid() == i[1]['uuid'],
1088                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
1089         except Exception:
1090             _logger.exception()
1091
1092     def want_event_subscribe(self):
1093         return True