Merge branch '12037-cwl-v1.0.1' closes #12037
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import logging
6 import re
7 import time
8 import llfuse
9 import arvados
10 import apiclient
11 import functools
12 import threading
13 from apiclient import errors as apiclient_errors
14 import errno
15 import time
16
17 from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from fresh import FreshBase, convertTime, use_counter, check_update
19
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
22
23 _logger = logging.getLogger('arvados.arvados_fuse')
24
25
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile('[\x00/]')
30
31 # '.' and '..' are not reachable if API server is newer than #6277
32 def sanitize_filename(dirty):
33     """Replace disallowed filename characters with harmless "_"."""
34     if dirty is None:
35         return None
36     elif dirty == '':
37         return '_'
38     elif dirty == '.':
39         return '_'
40     elif dirty == '..':
41         return '__'
42     else:
43         return _disallowed_filename_characters.sub('_', dirty)
44
45
46 class Directory(FreshBase):
47     """Generic directory object, backed by a dict.
48
49     Consists of a set of entries with the key representing the filename
50     and the value referencing a File or Directory object.
51     """
52
53     def __init__(self, parent_inode, inodes):
54         """parent_inode is the integer inode number"""
55
56         super(Directory, self).__init__()
57
58         self.inode = None
59         if not isinstance(parent_inode, int):
60             raise Exception("parent_inode should be an int")
61         self.parent_inode = parent_inode
62         self.inodes = inodes
63         self._entries = {}
64         self._mtime = time.time()
65
66     #  Overriden by subclasses to implement logic to update the entries dict
67     #  when the directory is stale
68     @use_counter
69     def update(self):
70         pass
71
72     # Only used when computing the size of the disk footprint of the directory
73     # (stub)
74     def size(self):
75         return 0
76
77     def persisted(self):
78         return False
79
80     def checkupdate(self):
81         if self.stale():
82             try:
83                 self.update()
84             except apiclient.errors.HttpError as e:
85                 _logger.warn(e)
86
87     @use_counter
88     @check_update
89     def __getitem__(self, item):
90         return self._entries[item]
91
92     @use_counter
93     @check_update
94     def items(self):
95         return list(self._entries.items())
96
97     @use_counter
98     @check_update
99     def __contains__(self, k):
100         return k in self._entries
101
102     @use_counter
103     @check_update
104     def __len__(self):
105         return len(self._entries)
106
107     def fresh(self):
108         self.inodes.touch(self)
109         super(Directory, self).fresh()
110
111     def merge(self, items, fn, same, new_entry):
112         """Helper method for updating the contents of the directory.
113
114         Takes a list describing the new contents of the directory, reuse
115         entries that are the same in both the old and new lists, create new
116         entries, and delete old entries missing from the new list.
117
118         :items: iterable with new directory contents
119
120         :fn: function to take an entry in 'items' and return the desired file or
121         directory name, or None if this entry should be skipped
122
123         :same: function to compare an existing entry (a File or Directory
124         object) with an entry in the items list to determine whether to keep
125         the existing entry.
126
127         :new_entry: function to create a new directory entry (File or Directory
128         object) from an entry in the items list.
129
130         """
131
132         oldentries = self._entries
133         self._entries = {}
134         changed = False
135         for i in items:
136             name = sanitize_filename(fn(i))
137             if name:
138                 if name in oldentries and same(oldentries[name], i):
139                     # move existing directory entry over
140                     self._entries[name] = oldentries[name]
141                     del oldentries[name]
142                 else:
143                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
144                     # create new directory entry
145                     ent = new_entry(i)
146                     if ent is not None:
147                         self._entries[name] = self.inodes.add_entry(ent)
148                         changed = True
149
150         # delete any other directory entries that were not in found in 'items'
151         for i in oldentries:
152             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
153             self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
154             self.inodes.del_entry(oldentries[i])
155             changed = True
156
157         if changed:
158             self.inodes.invalidate_inode(self.inode)
159             self._mtime = time.time()
160
161         self.fresh()
162
163     def in_use(self):
164         if super(Directory, self).in_use():
165             return True
166         for v in self._entries.itervalues():
167             if v.in_use():
168                 return True
169         return False
170
171     def has_ref(self, only_children):
172         if super(Directory, self).has_ref(only_children):
173             return True
174         for v in self._entries.itervalues():
175             if v.has_ref(False):
176                 return True
177         return False
178
179     def clear(self):
180         """Delete all entries"""
181         oldentries = self._entries
182         self._entries = {}
183         for n in oldentries:
184             oldentries[n].clear()
185             self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
186             self.inodes.del_entry(oldentries[n])
187         self.inodes.invalidate_inode(self.inode)
188         self.invalidate()
189
190     def kernel_invalidate(self):
191         for n, e in self._entries.iteritems():
192             self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
193             e.kernel_invalidate()
194         self.inodes.invalidate_inode(self.inode)
195
196     def mtime(self):
197         return self._mtime
198
199     def writable(self):
200         return False
201
202     def flush(self):
203         pass
204
205     def want_event_subscribe(self):
206         raise NotImplementedError()
207
208     def create(self, name):
209         raise NotImplementedError()
210
211     def mkdir(self, name):
212         raise NotImplementedError()
213
214     def unlink(self, name):
215         raise NotImplementedError()
216
217     def rmdir(self, name):
218         raise NotImplementedError()
219
220     def rename(self, name_old, name_new, src):
221         raise NotImplementedError()
222
223
224 class CollectionDirectoryBase(Directory):
225     """Represent an Arvados Collection as a directory.
226
227     This class is used for Subcollections, and is also the base class for
228     CollectionDirectory, which implements collection loading/saving on
229     Collection records.
230
231     Most operations act only the underlying Arvados `Collection` object.  The
232     `Collection` object signals via a notify callback to
233     `CollectionDirectoryBase.on_event` that an item was added, removed or
234     modified.  FUSE inodes and directory entries are created, deleted or
235     invalidated in response to these events.
236
237     """
238
239     def __init__(self, parent_inode, inodes, collection):
240         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
241         self.collection = collection
242
243     def new_entry(self, name, item, mtime):
244         name = sanitize_filename(name)
245         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
246             if item.fuse_entry.dead is not True:
247                 raise Exception("Can only reparent dead inode entry")
248             if item.fuse_entry.inode is None:
249                 raise Exception("Reparented entry must still have valid inode")
250             item.fuse_entry.dead = False
251             self._entries[name] = item.fuse_entry
252         elif isinstance(item, arvados.collection.RichCollectionBase):
253             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
254             self._entries[name].populate(mtime)
255         else:
256             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
257         item.fuse_entry = self._entries[name]
258
259     def on_event(self, event, collection, name, item):
260         if collection == self.collection:
261             name = sanitize_filename(name)
262             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
263             with llfuse.lock:
264                 if event == arvados.collection.ADD:
265                     self.new_entry(name, item, self.mtime())
266                 elif event == arvados.collection.DEL:
267                     ent = self._entries[name]
268                     del self._entries[name]
269                     self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
270                     self.inodes.del_entry(ent)
271                 elif event == arvados.collection.MOD:
272                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
273                         self.inodes.invalidate_inode(item.fuse_entry.inode)
274                     elif name in self._entries:
275                         self.inodes.invalidate_inode(self._entries[name].inode)
276
277     def populate(self, mtime):
278         self._mtime = mtime
279         self.collection.subscribe(self.on_event)
280         for entry, item in self.collection.items():
281             self.new_entry(entry, item, self.mtime())
282
283     def writable(self):
284         return self.collection.writable()
285
286     @use_counter
287     def flush(self):
288         with llfuse.lock_released:
289             self.collection.root_collection().save()
290
291     @use_counter
292     @check_update
293     def create(self, name):
294         with llfuse.lock_released:
295             self.collection.open(name, "w").close()
296
297     @use_counter
298     @check_update
299     def mkdir(self, name):
300         with llfuse.lock_released:
301             self.collection.mkdirs(name)
302
303     @use_counter
304     @check_update
305     def unlink(self, name):
306         with llfuse.lock_released:
307             self.collection.remove(name)
308         self.flush()
309
310     @use_counter
311     @check_update
312     def rmdir(self, name):
313         with llfuse.lock_released:
314             self.collection.remove(name)
315         self.flush()
316
317     @use_counter
318     @check_update
319     def rename(self, name_old, name_new, src):
320         if not isinstance(src, CollectionDirectoryBase):
321             raise llfuse.FUSEError(errno.EPERM)
322
323         if name_new in self:
324             ent = src[name_old]
325             tgt = self[name_new]
326             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
327                 pass
328             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
329                 if len(tgt) > 0:
330                     raise llfuse.FUSEError(errno.ENOTEMPTY)
331             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
332                 raise llfuse.FUSEError(errno.ENOTDIR)
333             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
334                 raise llfuse.FUSEError(errno.EISDIR)
335
336         with llfuse.lock_released:
337             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
338         self.flush()
339         src.flush()
340
341     def clear(self):
342         super(CollectionDirectoryBase, self).clear()
343         self.collection = None
344
345
346 class CollectionDirectory(CollectionDirectoryBase):
347     """Represents the root of a directory tree representing a collection."""
348
349     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
350         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
351         self.api = api
352         self.num_retries = num_retries
353         self.collection_record_file = None
354         self.collection_record = None
355         self._poll = True
356         try:
357             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2)/2)
358         except:
359             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
360             self._poll_time = 60*60
361
362         if isinstance(collection_record, dict):
363             self.collection_locator = collection_record['uuid']
364             self._mtime = convertTime(collection_record.get('modified_at'))
365         else:
366             self.collection_locator = collection_record
367             self._mtime = 0
368         self._manifest_size = 0
369         if self.collection_locator:
370             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
371         self._updating_lock = threading.Lock()
372
373     def same(self, i):
374         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
375
376     def writable(self):
377         return self.collection.writable() if self.collection is not None else self._writable
378
379     def want_event_subscribe(self):
380         return (uuid_pattern.match(self.collection_locator) is not None)
381
382     # Used by arv-web.py to switch the contents of the CollectionDirectory
383     def change_collection(self, new_locator):
384         """Switch the contents of the CollectionDirectory.
385
386         Must be called with llfuse.lock held.
387         """
388
389         self.collection_locator = new_locator
390         self.collection_record = None
391         self.update()
392
393     def new_collection(self, new_collection_record, coll_reader):
394         if self.inode:
395             self.clear()
396
397         self.collection_record = new_collection_record
398
399         if self.collection_record:
400             self._mtime = convertTime(self.collection_record.get('modified_at'))
401             self.collection_locator = self.collection_record["uuid"]
402             if self.collection_record_file is not None:
403                 self.collection_record_file.update(self.collection_record)
404
405         self.collection = coll_reader
406         self.populate(self.mtime())
407
408     def uuid(self):
409         return self.collection_locator
410
411     @use_counter
412     def update(self, to_record_version=None):
413         try:
414             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
415                 return True
416
417             if self.collection_locator is None:
418                 self.fresh()
419                 return True
420
421             try:
422                 with llfuse.lock_released:
423                     self._updating_lock.acquire()
424                     if not self.stale():
425                         return
426
427                     _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
428                     if self.collection is not None:
429                         if self.collection.known_past_version(to_record_version):
430                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
431                         else:
432                             self.collection.update()
433                     else:
434                         if uuid_pattern.match(self.collection_locator):
435                             coll_reader = arvados.collection.Collection(
436                                 self.collection_locator, self.api, self.api.keep,
437                                 num_retries=self.num_retries)
438                         else:
439                             coll_reader = arvados.collection.CollectionReader(
440                                 self.collection_locator, self.api, self.api.keep,
441                                 num_retries=self.num_retries)
442                         new_collection_record = coll_reader.api_response() or {}
443                         # If the Collection only exists in Keep, there will be no API
444                         # response.  Fill in the fields we need.
445                         if 'uuid' not in new_collection_record:
446                             new_collection_record['uuid'] = self.collection_locator
447                         if "portable_data_hash" not in new_collection_record:
448                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
449                         if 'manifest_text' not in new_collection_record:
450                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
451
452                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
453                             self.new_collection(new_collection_record, coll_reader)
454
455                         self._manifest_size = len(coll_reader.manifest_text())
456                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
457                 # end with llfuse.lock_released, re-acquire lock
458
459                 self.fresh()
460                 return True
461             finally:
462                 self._updating_lock.release()
463         except arvados.errors.NotFoundError as e:
464             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
465         except arvados.errors.ArgumentError as detail:
466             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
467             if self.collection_record is not None and "manifest_text" in self.collection_record:
468                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
469         except Exception:
470             _logger.exception("arv-mount %s: error", self.collection_locator)
471             if self.collection_record is not None and "manifest_text" in self.collection_record:
472                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
473         self.invalidate()
474         return False
475
476     @use_counter
477     @check_update
478     def __getitem__(self, item):
479         if item == '.arvados#collection':
480             if self.collection_record_file is None:
481                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
482                 self.inodes.add_entry(self.collection_record_file)
483             return self.collection_record_file
484         else:
485             return super(CollectionDirectory, self).__getitem__(item)
486
487     def __contains__(self, k):
488         if k == '.arvados#collection':
489             return True
490         else:
491             return super(CollectionDirectory, self).__contains__(k)
492
493     def invalidate(self):
494         self.collection_record = None
495         self.collection_record_file = None
496         super(CollectionDirectory, self).invalidate()
497
498     def persisted(self):
499         return (self.collection_locator is not None)
500
501     def objsize(self):
502         # This is an empirically-derived heuristic to estimate the memory used
503         # to store this collection's metadata.  Calculating the memory
504         # footprint directly would be more accurate, but also more complicated.
505         return self._manifest_size * 128
506
507     def finalize(self):
508         if self.collection is not None:
509             if self.writable():
510                 self.collection.save()
511             self.collection.stop_threads()
512
513     def clear(self):
514         if self.collection is not None:
515             self.collection.stop_threads()
516         super(CollectionDirectory, self).clear()
517         self._manifest_size = 0
518
519
520 class TmpCollectionDirectory(CollectionDirectoryBase):
521     """A directory backed by an Arvados collection that never gets saved.
522
523     This supports using Keep as scratch space. A userspace program can
524     read the .arvados#collection file to get a current manifest in
525     order to save a snapshot of the scratch data or use it as a crunch
526     job output.
527     """
528
529     class UnsaveableCollection(arvados.collection.Collection):
530         def save(self):
531             pass
532         def save_new(self):
533             pass
534
535     def __init__(self, parent_inode, inodes, api_client, num_retries):
536         collection = self.UnsaveableCollection(
537             api_client=api_client,
538             keep_client=api_client.keep,
539             num_retries=num_retries)
540         super(TmpCollectionDirectory, self).__init__(
541             parent_inode, inodes, collection)
542         self.collection_record_file = None
543         self.populate(self.mtime())
544
545     def on_event(self, *args, **kwargs):
546         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
547         if self.collection_record_file:
548             with llfuse.lock:
549                 self.collection_record_file.invalidate()
550             self.inodes.invalidate_inode(self.collection_record_file.inode)
551             _logger.debug("%s invalidated collection record", self)
552
553     def collection_record(self):
554         with llfuse.lock_released:
555             return {
556                 "uuid": None,
557                 "manifest_text": self.collection.manifest_text(),
558                 "portable_data_hash": self.collection.portable_data_hash(),
559             }
560
561     def __contains__(self, k):
562         return (k == '.arvados#collection' or
563                 super(TmpCollectionDirectory, self).__contains__(k))
564
565     @use_counter
566     def __getitem__(self, item):
567         if item == '.arvados#collection':
568             if self.collection_record_file is None:
569                 self.collection_record_file = FuncToJSONFile(
570                     self.inode, self.collection_record)
571                 self.inodes.add_entry(self.collection_record_file)
572             return self.collection_record_file
573         return super(TmpCollectionDirectory, self).__getitem__(item)
574
575     def persisted(self):
576         return False
577
578     def writable(self):
579         return True
580
581     def want_event_subscribe(self):
582         return False
583
584     def finalize(self):
585         self.collection.stop_threads()
586
587     def invalidate(self):
588         if self.collection_record_file:
589             self.collection_record_file.invalidate()
590         super(TmpCollectionDirectory, self).invalidate()
591
592
593 class MagicDirectory(Directory):
594     """A special directory that logically contains the set of all extant keep locators.
595
596     When a file is referenced by lookup(), it is tested to see if it is a valid
597     keep locator to a manifest, and if so, loads the manifest contents as a
598     subdirectory of this directory with the locator as the directory name.
599     Since querying a list of all extant keep locators is impractical, only
600     collections that have already been accessed are visible to readdir().
601
602     """
603
604     README_TEXT = """
605 This directory provides access to Arvados collections as subdirectories listed
606 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
607 the form '1234567890abcdef0123456789abcdef+123').
608
609 Note that this directory will appear empty until you attempt to access a
610 specific collection subdirectory (such as trying to 'cd' into it), at which
611 point the collection will actually be looked up on the server and the directory
612 will appear if it exists.
613
614 """.lstrip()
615
616     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
617         super(MagicDirectory, self).__init__(parent_inode, inodes)
618         self.api = api
619         self.num_retries = num_retries
620         self.pdh_only = pdh_only
621
622     def __setattr__(self, name, value):
623         super(MagicDirectory, self).__setattr__(name, value)
624         # When we're assigned an inode, add a README.
625         if ((name == 'inode') and (self.inode is not None) and
626               (not self._entries)):
627             self._entries['README'] = self.inodes.add_entry(
628                 StringFile(self.inode, self.README_TEXT, time.time()))
629             # If we're the root directory, add an identical by_id subdirectory.
630             if self.inode == llfuse.ROOT_INODE:
631                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
632                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
633
634     def __contains__(self, k):
635         if k in self._entries:
636             return True
637
638         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
639             return False
640
641         try:
642             e = self.inodes.add_entry(CollectionDirectory(
643                     self.inode, self.inodes, self.api, self.num_retries, k))
644
645             if e.update():
646                 if k not in self._entries:
647                     self._entries[k] = e
648                 else:
649                     self.inodes.del_entry(e)
650                 return True
651             else:
652                 self.inodes.invalidate_entry(self.inode, k)
653                 self.inodes.del_entry(e)
654                 return False
655         except Exception as ex:
656             _logger.debug('arv-mount exception keep %s', ex)
657             self.inodes.del_entry(e)
658             return False
659
660     def __getitem__(self, item):
661         if item in self:
662             return self._entries[item]
663         else:
664             raise KeyError("No collection with id " + item)
665
666     def clear(self):
667         pass
668
669     def want_event_subscribe(self):
670         return not self.pdh_only
671
672
673 class TagsDirectory(Directory):
674     """A special directory that contains as subdirectories all tags visible to the user."""
675
676     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
677         super(TagsDirectory, self).__init__(parent_inode, inodes)
678         self.api = api
679         self.num_retries = num_retries
680         self._poll = True
681         self._poll_time = poll_time
682         self._extra = set()
683
684     def want_event_subscribe(self):
685         return True
686
687     @use_counter
688     def update(self):
689         with llfuse.lock_released:
690             tags = self.api.links().list(
691                 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
692                 select=['name'], distinct=True, limit=1000
693                 ).execute(num_retries=self.num_retries)
694         if "items" in tags:
695             self.merge(tags['items']+[{"name": n} for n in self._extra],
696                        lambda i: i['name'],
697                        lambda a, i: a.tag == i['name'],
698                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
699
700     @use_counter
701     @check_update
702     def __getitem__(self, item):
703         if super(TagsDirectory, self).__contains__(item):
704             return super(TagsDirectory, self).__getitem__(item)
705         with llfuse.lock_released:
706             tags = self.api.links().list(
707                 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
708             ).execute(num_retries=self.num_retries)
709         if tags["items"]:
710             self._extra.add(item)
711             self.update()
712         return super(TagsDirectory, self).__getitem__(item)
713
714     @use_counter
715     @check_update
716     def __contains__(self, k):
717         if super(TagsDirectory, self).__contains__(k):
718             return True
719         try:
720             self[k]
721             return True
722         except KeyError:
723             pass
724         return False
725
726
727 class TagDirectory(Directory):
728     """A special directory that contains as subdirectories all collections visible
729     to the user that are tagged with a particular tag.
730     """
731
732     def __init__(self, parent_inode, inodes, api, num_retries, tag,
733                  poll=False, poll_time=60):
734         super(TagDirectory, self).__init__(parent_inode, inodes)
735         self.api = api
736         self.num_retries = num_retries
737         self.tag = tag
738         self._poll = poll
739         self._poll_time = poll_time
740
741     def want_event_subscribe(self):
742         return True
743
744     @use_counter
745     def update(self):
746         with llfuse.lock_released:
747             taggedcollections = self.api.links().list(
748                 filters=[['link_class', '=', 'tag'],
749                          ['name', '=', self.tag],
750                          ['head_uuid', 'is_a', 'arvados#collection']],
751                 select=['head_uuid']
752                 ).execute(num_retries=self.num_retries)
753         self.merge(taggedcollections['items'],
754                    lambda i: i['head_uuid'],
755                    lambda a, i: a.collection_locator == i['head_uuid'],
756                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
757
758
759 class ProjectDirectory(Directory):
760     """A special directory that contains the contents of a project."""
761
762     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
763                  poll=False, poll_time=60):
764         super(ProjectDirectory, self).__init__(parent_inode, inodes)
765         self.api = api
766         self.num_retries = num_retries
767         self.project_object = project_object
768         self.project_object_file = None
769         self.project_uuid = project_object['uuid']
770         self._poll = poll
771         self._poll_time = poll_time
772         self._updating_lock = threading.Lock()
773         self._current_user = None
774         self._full_listing = False
775
776     def want_event_subscribe(self):
777         return True
778
779     def createDirectory(self, i):
780         if collection_uuid_pattern.match(i['uuid']):
781             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
782         elif group_uuid_pattern.match(i['uuid']):
783             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
784         elif link_uuid_pattern.match(i['uuid']):
785             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
786                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
787             else:
788                 return None
789         elif uuid_pattern.match(i['uuid']):
790             return ObjectFile(self.parent_inode, i)
791         else:
792             return None
793
794     def uuid(self):
795         return self.project_uuid
796
797     def items(self):
798         self._full_listing = True
799         return super(ProjectDirectory, self).items()
800
801     def namefn(self, i):
802         if 'name' in i:
803             if i['name'] is None or len(i['name']) == 0:
804                 return None
805             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
806                 # collection or subproject
807                 return i['name']
808             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
809                 # name link
810                 return i['name']
811             elif 'kind' in i and i['kind'].startswith('arvados#'):
812                 # something else
813                 return "{}.{}".format(i['name'], i['kind'][8:])
814         else:
815             return None
816
817
818     @use_counter
819     def update(self):
820         if self.project_object_file == None:
821             self.project_object_file = ObjectFile(self.inode, self.project_object)
822             self.inodes.add_entry(self.project_object_file)
823
824         if not self._full_listing:
825             return
826
827         def samefn(a, i):
828             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
829                 return a.uuid() == i['uuid']
830             elif isinstance(a, ObjectFile):
831                 return a.uuid() == i['uuid'] and not a.stale()
832             return False
833
834         try:
835             with llfuse.lock_released:
836                 self._updating_lock.acquire()
837                 if not self.stale():
838                     return
839
840                 if group_uuid_pattern.match(self.project_uuid):
841                     self.project_object = self.api.groups().get(
842                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
843                 elif user_uuid_pattern.match(self.project_uuid):
844                     self.project_object = self.api.users().get(
845                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
846
847                 contents = arvados.util.list_all(self.api.groups().list,
848                                                  self.num_retries,
849                                                  filters=[["owner_uuid", "=", self.project_uuid],
850                                                           ["group_class", "=", "project"]])
851                 contents.extend(arvados.util.list_all(self.api.collections().list,
852                                                       self.num_retries,
853                                                       filters=[["owner_uuid", "=", self.project_uuid]]))
854
855             # end with llfuse.lock_released, re-acquire lock
856
857             self.merge(contents,
858                        self.namefn,
859                        samefn,
860                        self.createDirectory)
861         finally:
862             self._updating_lock.release()
863
864     def _add_entry(self, i, name):
865         ent = self.createDirectory(i)
866         self._entries[name] = self.inodes.add_entry(ent)
867         return self._entries[name]
868
869     @use_counter
870     @check_update
871     def __getitem__(self, k):
872         if k == '.arvados#project':
873             return self.project_object_file
874         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
875             return super(ProjectDirectory, self).__getitem__(k)
876         with llfuse.lock_released:
877             contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
878                                                        ["group_class", "=", "project"],
879                                                        ["name", "=", k]],
880                                               limit=1).execute(num_retries=self.num_retries)["items"]
881             if not contents:
882                 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
883                                                                 ["name", "=", k]],
884                                                        limit=1).execute(num_retries=self.num_retries)["items"]
885         if contents:
886             name = sanitize_filename(self.namefn(contents[0]))
887             if name != k:
888                 raise KeyError(k)
889             return self._add_entry(contents[0], name)
890
891         # Didn't find item
892         raise KeyError(k)
893
894     def __contains__(self, k):
895         if k == '.arvados#project':
896             return True
897         try:
898             self[k]
899             return True
900         except KeyError:
901             pass
902         return False
903
904     @use_counter
905     @check_update
906     def writable(self):
907         with llfuse.lock_released:
908             if not self._current_user:
909                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
910             return self._current_user["uuid"] in self.project_object["writable_by"]
911
912     def persisted(self):
913         return True
914
915     @use_counter
916     @check_update
917     def mkdir(self, name):
918         try:
919             with llfuse.lock_released:
920                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
921                                                     "name": name,
922                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
923             self.invalidate()
924         except apiclient_errors.Error as error:
925             _logger.error(error)
926             raise llfuse.FUSEError(errno.EEXIST)
927
928     @use_counter
929     @check_update
930     def rmdir(self, name):
931         if name not in self:
932             raise llfuse.FUSEError(errno.ENOENT)
933         if not isinstance(self[name], CollectionDirectory):
934             raise llfuse.FUSEError(errno.EPERM)
935         if len(self[name]) > 0:
936             raise llfuse.FUSEError(errno.ENOTEMPTY)
937         with llfuse.lock_released:
938             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
939         self.invalidate()
940
941     @use_counter
942     @check_update
943     def rename(self, name_old, name_new, src):
944         if not isinstance(src, ProjectDirectory):
945             raise llfuse.FUSEError(errno.EPERM)
946
947         ent = src[name_old]
948
949         if not isinstance(ent, CollectionDirectory):
950             raise llfuse.FUSEError(errno.EPERM)
951
952         if name_new in self:
953             # POSIX semantics for replacing one directory with another is
954             # tricky (the target directory must be empty, the operation must be
955             # atomic which isn't possible with the Arvados API as of this
956             # writing) so don't support that.
957             raise llfuse.FUSEError(errno.EPERM)
958
959         self.api.collections().update(uuid=ent.uuid(),
960                                       body={"owner_uuid": self.uuid(),
961                                             "name": name_new}).execute(num_retries=self.num_retries)
962
963         # Acually move the entry from source directory to this directory.
964         del src._entries[name_old]
965         self._entries[name_new] = ent
966         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
967
968     @use_counter
969     def child_event(self, ev):
970         properties = ev.get("properties") or {}
971         old_attrs = properties.get("old_attributes") or {}
972         new_attrs = properties.get("new_attributes") or {}
973         old_attrs["uuid"] = ev["object_uuid"]
974         new_attrs["uuid"] = ev["object_uuid"]
975         old_name = sanitize_filename(self.namefn(old_attrs))
976         new_name = sanitize_filename(self.namefn(new_attrs))
977
978         # create events will have a new name, but not an old name
979         # delete events will have an old name, but not a new name
980         # update events will have an old and new name, and they may be same or different
981         # if they are the same, an unrelated field changed and there is nothing to do.
982
983         if old_attrs.get("owner_uuid") != self.project_uuid:
984             # Was moved from somewhere else, so don't try to remove entry.
985             old_name = None
986         if ev.get("object_owner_uuid") != self.project_uuid:
987             # Was moved to somewhere else, so don't try to add entry
988             new_name = None
989
990         if ev.get("object_kind") == "arvados#collection":
991             if old_attrs.get("is_trashed"):
992                 # Was previously deleted
993                 old_name = None
994             if new_attrs.get("is_trashed"):
995                 # Has been deleted
996                 new_name = None
997
998         if new_name != old_name:
999             ent = None
1000             if old_name in self._entries:
1001                 ent = self._entries[old_name]
1002                 del self._entries[old_name]
1003                 self.inodes.invalidate_entry(self.inode, old_name.encode(self.inodes.encoding))
1004
1005             if new_name:
1006                 if ent is not None:
1007                     self._entries[new_name] = ent
1008                 else:
1009                     self._add_entry(new_attrs, new_name)
1010             elif ent is not None:
1011                 self.inodes.del_entry(ent)
1012
1013
1014 class SharedDirectory(Directory):
1015     """A special directory that represents users or groups who have shared projects with me."""
1016
1017     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
1018                  poll=False, poll_time=60):
1019         super(SharedDirectory, self).__init__(parent_inode, inodes)
1020         self.api = api
1021         self.num_retries = num_retries
1022         self.current_user = api.users().current().execute(num_retries=num_retries)
1023         self._poll = True
1024         self._poll_time = poll_time
1025
1026     @use_counter
1027     def update(self):
1028         with llfuse.lock_released:
1029             all_projects = arvados.util.list_all(
1030                 self.api.groups().list, self.num_retries,
1031                 filters=[['group_class','=','project']])
1032             objects = {}
1033             for ob in all_projects:
1034                 objects[ob['uuid']] = ob
1035
1036             roots = []
1037             root_owners = {}
1038             for ob in all_projects:
1039                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
1040                     roots.append(ob)
1041                     root_owners[ob['owner_uuid']] = True
1042
1043             lusers = arvados.util.list_all(
1044                 self.api.users().list, self.num_retries,
1045                 filters=[['uuid','in', list(root_owners)]])
1046             lgroups = arvados.util.list_all(
1047                 self.api.groups().list, self.num_retries,
1048                 filters=[['uuid','in', list(root_owners)]])
1049
1050             users = {}
1051             groups = {}
1052
1053             for l in lusers:
1054                 objects[l["uuid"]] = l
1055             for l in lgroups:
1056                 objects[l["uuid"]] = l
1057
1058             contents = {}
1059             for r in root_owners:
1060                 if r in objects:
1061                     obr = objects[r]
1062                     if obr.get("name"):
1063                         contents[obr["name"]] = obr
1064                     #elif obr.get("username"):
1065                     #    contents[obr["username"]] = obr
1066                     elif "first_name" in obr:
1067                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1068
1069
1070             for r in roots:
1071                 if r['owner_uuid'] not in objects:
1072                     contents[r['name']] = r
1073
1074         # end with llfuse.lock_released, re-acquire lock
1075
1076         try:
1077             self.merge(contents.items(),
1078                        lambda i: i[0],
1079                        lambda a, i: a.uuid() == i[1]['uuid'],
1080                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
1081         except Exception:
1082             _logger.exception()
1083
1084     def want_event_subscribe(self):
1085         return True