14965: Fixes crunchstat test assertion
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future.utils import viewitems
8 from builtins import dict
9 import logging
10 import re
11 import time
12 import llfuse
13 import arvados
14 import apiclient
15 import functools
16 import threading
17 from apiclient import errors as apiclient_errors
18 import errno
19 import time
20
21 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
22 from .fresh import FreshBase, convertTime, use_counter, check_update
23
24 import arvados.collection
25 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
26
27 _logger = logging.getLogger('arvados.arvados_fuse')
28
29
30 # Match any character which FUSE or Linux cannot accommodate as part
31 # of a filename. (If present in a collection filename, they will
32 # appear as underscores in the fuse mount.)
33 _disallowed_filename_characters = re.compile('[\x00/]')
34
35 # '.' and '..' are not reachable if API server is newer than #6277
36 def sanitize_filename(dirty):
37     """Replace disallowed filename characters with harmless "_"."""
38     if dirty is None:
39         return None
40     elif dirty == '':
41         return '_'
42     elif dirty == '.':
43         return '_'
44     elif dirty == '..':
45         return '__'
46     else:
47         return _disallowed_filename_characters.sub('_', dirty)
48
49
50 class Directory(FreshBase):
51     """Generic directory object, backed by a dict.
52
53     Consists of a set of entries with the key representing the filename
54     and the value referencing a File or Directory object.
55     """
56
57     def __init__(self, parent_inode, inodes):
58         """parent_inode is the integer inode number"""
59
60         super(Directory, self).__init__()
61
62         self.inode = None
63         if not isinstance(parent_inode, int):
64             raise Exception("parent_inode should be an int")
65         self.parent_inode = parent_inode
66         self.inodes = inodes
67         self._entries = {}
68         self._mtime = time.time()
69
70     #  Overriden by subclasses to implement logic to update the entries dict
71     #  when the directory is stale
72     @use_counter
73     def update(self):
74         pass
75
76     # Only used when computing the size of the disk footprint of the directory
77     # (stub)
78     def size(self):
79         return 0
80
81     def persisted(self):
82         return False
83
84     def checkupdate(self):
85         if self.stale():
86             try:
87                 self.update()
88             except apiclient.errors.HttpError as e:
89                 _logger.warn(e)
90
91     @use_counter
92     @check_update
93     def __getitem__(self, item):
94         return self._entries[item]
95
96     @use_counter
97     @check_update
98     def items(self):
99         return list(self._entries.items())
100
101     @use_counter
102     @check_update
103     def __contains__(self, k):
104         return k in self._entries
105
106     @use_counter
107     @check_update
108     def __len__(self):
109         return len(self._entries)
110
111     def fresh(self):
112         self.inodes.touch(self)
113         super(Directory, self).fresh()
114
115     def merge(self, items, fn, same, new_entry):
116         """Helper method for updating the contents of the directory.
117
118         Takes a list describing the new contents of the directory, reuse
119         entries that are the same in both the old and new lists, create new
120         entries, and delete old entries missing from the new list.
121
122         :items: iterable with new directory contents
123
124         :fn: function to take an entry in 'items' and return the desired file or
125         directory name, or None if this entry should be skipped
126
127         :same: function to compare an existing entry (a File or Directory
128         object) with an entry in the items list to determine whether to keep
129         the existing entry.
130
131         :new_entry: function to create a new directory entry (File or Directory
132         object) from an entry in the items list.
133
134         """
135
136         oldentries = self._entries
137         self._entries = {}
138         changed = False
139         for i in items:
140             name = sanitize_filename(fn(i))
141             if name:
142                 if name in oldentries and same(oldentries[name], i):
143                     # move existing directory entry over
144                     self._entries[name] = oldentries[name]
145                     del oldentries[name]
146                 else:
147                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
148                     # create new directory entry
149                     ent = new_entry(i)
150                     if ent is not None:
151                         self._entries[name] = self.inodes.add_entry(ent)
152                         changed = True
153
154         # delete any other directory entries that were not in found in 'items'
155         for i in oldentries:
156             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
157             self.inodes.invalidate_entry(self, i)
158             self.inodes.del_entry(oldentries[i])
159             changed = True
160
161         if changed:
162             self.inodes.invalidate_inode(self)
163             self._mtime = time.time()
164
165         self.fresh()
166
167     def in_use(self):
168         if super(Directory, self).in_use():
169             return True
170         for v in self._entries.values():
171             if v.in_use():
172                 return True
173         return False
174
175     def has_ref(self, only_children):
176         if super(Directory, self).has_ref(only_children):
177             return True
178         for v in self._entries.values():
179             if v.has_ref(False):
180                 return True
181         return False
182
183     def clear(self):
184         """Delete all entries"""
185         oldentries = self._entries
186         self._entries = {}
187         for n in oldentries:
188             oldentries[n].clear()
189             self.inodes.del_entry(oldentries[n])
190         self.invalidate()
191
192     def kernel_invalidate(self):
193         # Invalidating the dentry on the parent implies invalidating all paths
194         # below it as well.
195         parent = self.inodes[self.parent_inode]
196
197         # Find self on the parent in order to invalidate this path.
198         # Calling the public items() method might trigger a refresh,
199         # which we definitely don't want, so read the internal dict directly.
200         for k,v in viewitems(parent._entries):
201             if v is self:
202                 self.inodes.invalidate_entry(parent, k)
203                 break
204
205     def mtime(self):
206         return self._mtime
207
208     def writable(self):
209         return False
210
211     def flush(self):
212         pass
213
214     def want_event_subscribe(self):
215         raise NotImplementedError()
216
217     def create(self, name):
218         raise NotImplementedError()
219
220     def mkdir(self, name):
221         raise NotImplementedError()
222
223     def unlink(self, name):
224         raise NotImplementedError()
225
226     def rmdir(self, name):
227         raise NotImplementedError()
228
229     def rename(self, name_old, name_new, src):
230         raise NotImplementedError()
231
232
233 class CollectionDirectoryBase(Directory):
234     """Represent an Arvados Collection as a directory.
235
236     This class is used for Subcollections, and is also the base class for
237     CollectionDirectory, which implements collection loading/saving on
238     Collection records.
239
240     Most operations act only the underlying Arvados `Collection` object.  The
241     `Collection` object signals via a notify callback to
242     `CollectionDirectoryBase.on_event` that an item was added, removed or
243     modified.  FUSE inodes and directory entries are created, deleted or
244     invalidated in response to these events.
245
246     """
247
248     def __init__(self, parent_inode, inodes, collection):
249         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
250         self.collection = collection
251
252     def new_entry(self, name, item, mtime):
253         name = sanitize_filename(name)
254         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
255             if item.fuse_entry.dead is not True:
256                 raise Exception("Can only reparent dead inode entry")
257             if item.fuse_entry.inode is None:
258                 raise Exception("Reparented entry must still have valid inode")
259             item.fuse_entry.dead = False
260             self._entries[name] = item.fuse_entry
261         elif isinstance(item, arvados.collection.RichCollectionBase):
262             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
263             self._entries[name].populate(mtime)
264         else:
265             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
266         item.fuse_entry = self._entries[name]
267
268     def on_event(self, event, collection, name, item):
269         if collection == self.collection:
270             name = sanitize_filename(name)
271             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
272             with llfuse.lock:
273                 if event == arvados.collection.ADD:
274                     self.new_entry(name, item, self.mtime())
275                 elif event == arvados.collection.DEL:
276                     ent = self._entries[name]
277                     del self._entries[name]
278                     self.inodes.invalidate_entry(self, name)
279                     self.inodes.del_entry(ent)
280                 elif event == arvados.collection.MOD:
281                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
282                         self.inodes.invalidate_inode(item.fuse_entry)
283                     elif name in self._entries:
284                         self.inodes.invalidate_inode(self._entries[name])
285
286     def populate(self, mtime):
287         self._mtime = mtime
288         self.collection.subscribe(self.on_event)
289         for entry, item in viewitems(self.collection):
290             self.new_entry(entry, item, self.mtime())
291
292     def writable(self):
293         return self.collection.writable()
294
295     @use_counter
296     def flush(self):
297         with llfuse.lock_released:
298             self.collection.root_collection().save()
299
300     @use_counter
301     @check_update
302     def create(self, name):
303         with llfuse.lock_released:
304             self.collection.open(name, "w").close()
305
306     @use_counter
307     @check_update
308     def mkdir(self, name):
309         with llfuse.lock_released:
310             self.collection.mkdirs(name)
311
312     @use_counter
313     @check_update
314     def unlink(self, name):
315         with llfuse.lock_released:
316             self.collection.remove(name)
317         self.flush()
318
319     @use_counter
320     @check_update
321     def rmdir(self, name):
322         with llfuse.lock_released:
323             self.collection.remove(name)
324         self.flush()
325
326     @use_counter
327     @check_update
328     def rename(self, name_old, name_new, src):
329         if not isinstance(src, CollectionDirectoryBase):
330             raise llfuse.FUSEError(errno.EPERM)
331
332         if name_new in self:
333             ent = src[name_old]
334             tgt = self[name_new]
335             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
336                 pass
337             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
338                 if len(tgt) > 0:
339                     raise llfuse.FUSEError(errno.ENOTEMPTY)
340             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
341                 raise llfuse.FUSEError(errno.ENOTDIR)
342             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
343                 raise llfuse.FUSEError(errno.EISDIR)
344
345         with llfuse.lock_released:
346             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
347         self.flush()
348         src.flush()
349
350     def clear(self):
351         super(CollectionDirectoryBase, self).clear()
352         self.collection = None
353
354
355 class CollectionDirectory(CollectionDirectoryBase):
356     """Represents the root of a directory tree representing a collection."""
357
358     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
359         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
360         self.api = api
361         self.num_retries = num_retries
362         self.collection_record_file = None
363         self.collection_record = None
364         self._poll = True
365         try:
366             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
367         except:
368             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
369             self._poll_time = 60*60
370
371         if isinstance(collection_record, dict):
372             self.collection_locator = collection_record['uuid']
373             self._mtime = convertTime(collection_record.get('modified_at'))
374         else:
375             self.collection_locator = collection_record
376             self._mtime = 0
377         self._manifest_size = 0
378         if self.collection_locator:
379             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
380         self._updating_lock = threading.Lock()
381
382     def same(self, i):
383         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
384
385     def writable(self):
386         return self.collection.writable() if self.collection is not None else self._writable
387
388     def want_event_subscribe(self):
389         return (uuid_pattern.match(self.collection_locator) is not None)
390
391     # Used by arv-web.py to switch the contents of the CollectionDirectory
392     def change_collection(self, new_locator):
393         """Switch the contents of the CollectionDirectory.
394
395         Must be called with llfuse.lock held.
396         """
397
398         self.collection_locator = new_locator
399         self.collection_record = None
400         self.update()
401
402     def new_collection(self, new_collection_record, coll_reader):
403         if self.inode:
404             self.clear()
405
406         self.collection_record = new_collection_record
407
408         if self.collection_record:
409             self._mtime = convertTime(self.collection_record.get('modified_at'))
410             self.collection_locator = self.collection_record["uuid"]
411             if self.collection_record_file is not None:
412                 self.collection_record_file.update(self.collection_record)
413
414         self.collection = coll_reader
415         self.populate(self.mtime())
416
417     def uuid(self):
418         return self.collection_locator
419
420     @use_counter
421     def update(self, to_record_version=None):
422         try:
423             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
424                 return True
425
426             if self.collection_locator is None:
427                 self.fresh()
428                 return True
429
430             try:
431                 with llfuse.lock_released:
432                     self._updating_lock.acquire()
433                     if not self.stale():
434                         return
435
436                     _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
437                     if self.collection is not None:
438                         if self.collection.known_past_version(to_record_version):
439                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
440                         else:
441                             self.collection.update()
442                     else:
443                         if uuid_pattern.match(self.collection_locator):
444                             coll_reader = arvados.collection.Collection(
445                                 self.collection_locator, self.api, self.api.keep,
446                                 num_retries=self.num_retries)
447                         else:
448                             coll_reader = arvados.collection.CollectionReader(
449                                 self.collection_locator, self.api, self.api.keep,
450                                 num_retries=self.num_retries)
451                         new_collection_record = coll_reader.api_response() or {}
452                         # If the Collection only exists in Keep, there will be no API
453                         # response.  Fill in the fields we need.
454                         if 'uuid' not in new_collection_record:
455                             new_collection_record['uuid'] = self.collection_locator
456                         if "portable_data_hash" not in new_collection_record:
457                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
458                         if 'manifest_text' not in new_collection_record:
459                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
460
461                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
462                             self.new_collection(new_collection_record, coll_reader)
463
464                         self._manifest_size = len(coll_reader.manifest_text())
465                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
466                 # end with llfuse.lock_released, re-acquire lock
467
468                 self.fresh()
469                 return True
470             finally:
471                 self._updating_lock.release()
472         except arvados.errors.NotFoundError as e:
473             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
474         except arvados.errors.ArgumentError as detail:
475             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
476             if self.collection_record is not None and "manifest_text" in self.collection_record:
477                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
478         except Exception:
479             _logger.exception("arv-mount %s: error", self.collection_locator)
480             if self.collection_record is not None and "manifest_text" in self.collection_record:
481                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
482         self.invalidate()
483         return False
484
485     @use_counter
486     @check_update
487     def __getitem__(self, item):
488         if item == '.arvados#collection':
489             if self.collection_record_file is None:
490                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
491                 self.inodes.add_entry(self.collection_record_file)
492             return self.collection_record_file
493         else:
494             return super(CollectionDirectory, self).__getitem__(item)
495
496     def __contains__(self, k):
497         if k == '.arvados#collection':
498             return True
499         else:
500             return super(CollectionDirectory, self).__contains__(k)
501
502     def invalidate(self):
503         self.collection_record = None
504         self.collection_record_file = None
505         super(CollectionDirectory, self).invalidate()
506
507     def persisted(self):
508         return (self.collection_locator is not None)
509
510     def objsize(self):
511         # This is an empirically-derived heuristic to estimate the memory used
512         # to store this collection's metadata.  Calculating the memory
513         # footprint directly would be more accurate, but also more complicated.
514         return self._manifest_size * 128
515
516     def finalize(self):
517         if self.collection is not None:
518             if self.writable():
519                 self.collection.save()
520             self.collection.stop_threads()
521
522     def clear(self):
523         if self.collection is not None:
524             self.collection.stop_threads()
525         super(CollectionDirectory, self).clear()
526         self._manifest_size = 0
527
528
529 class TmpCollectionDirectory(CollectionDirectoryBase):
530     """A directory backed by an Arvados collection that never gets saved.
531
532     This supports using Keep as scratch space. A userspace program can
533     read the .arvados#collection file to get a current manifest in
534     order to save a snapshot of the scratch data or use it as a crunch
535     job output.
536     """
537
538     class UnsaveableCollection(arvados.collection.Collection):
539         def save(self):
540             pass
541         def save_new(self):
542             pass
543
544     def __init__(self, parent_inode, inodes, api_client, num_retries):
545         collection = self.UnsaveableCollection(
546             api_client=api_client,
547             keep_client=api_client.keep,
548             num_retries=num_retries)
549         super(TmpCollectionDirectory, self).__init__(
550             parent_inode, inodes, collection)
551         self.collection_record_file = None
552         self.populate(self.mtime())
553
554     def on_event(self, *args, **kwargs):
555         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
556         if self.collection_record_file:
557             with llfuse.lock:
558                 self.collection_record_file.invalidate()
559             self.inodes.invalidate_inode(self.collection_record_file)
560             _logger.debug("%s invalidated collection record", self)
561
562     def collection_record(self):
563         with llfuse.lock_released:
564             return {
565                 "uuid": None,
566                 "manifest_text": self.collection.manifest_text(),
567                 "portable_data_hash": self.collection.portable_data_hash(),
568             }
569
570     def __contains__(self, k):
571         return (k == '.arvados#collection' or
572                 super(TmpCollectionDirectory, self).__contains__(k))
573
574     @use_counter
575     def __getitem__(self, item):
576         if item == '.arvados#collection':
577             if self.collection_record_file is None:
578                 self.collection_record_file = FuncToJSONFile(
579                     self.inode, self.collection_record)
580                 self.inodes.add_entry(self.collection_record_file)
581             return self.collection_record_file
582         return super(TmpCollectionDirectory, self).__getitem__(item)
583
584     def persisted(self):
585         return False
586
587     def writable(self):
588         return True
589
590     def want_event_subscribe(self):
591         return False
592
593     def finalize(self):
594         self.collection.stop_threads()
595
596     def invalidate(self):
597         if self.collection_record_file:
598             self.collection_record_file.invalidate()
599         super(TmpCollectionDirectory, self).invalidate()
600
601
602 class MagicDirectory(Directory):
603     """A special directory that logically contains the set of all extant keep locators.
604
605     When a file is referenced by lookup(), it is tested to see if it is a valid
606     keep locator to a manifest, and if so, loads the manifest contents as a
607     subdirectory of this directory with the locator as the directory name.
608     Since querying a list of all extant keep locators is impractical, only
609     collections that have already been accessed are visible to readdir().
610
611     """
612
613     README_TEXT = """
614 This directory provides access to Arvados collections as subdirectories listed
615 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
616 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
617 (in the form 'zzzzz-j7d0g-1234567890abcde').
618
619 Note that this directory will appear empty until you attempt to access a
620 specific collection or project subdirectory (such as trying to 'cd' into it),
621 at which point the collection or project will actually be looked up on the server
622 and the directory will appear if it exists.
623
624 """.lstrip()
625
626     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
627         super(MagicDirectory, self).__init__(parent_inode, inodes)
628         self.api = api
629         self.num_retries = num_retries
630         self.pdh_only = pdh_only
631
632     def __setattr__(self, name, value):
633         super(MagicDirectory, self).__setattr__(name, value)
634         # When we're assigned an inode, add a README.
635         if ((name == 'inode') and (self.inode is not None) and
636               (not self._entries)):
637             self._entries['README'] = self.inodes.add_entry(
638                 StringFile(self.inode, self.README_TEXT, time.time()))
639             # If we're the root directory, add an identical by_id subdirectory.
640             if self.inode == llfuse.ROOT_INODE:
641                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
642                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
643
644     def __contains__(self, k):
645         if k in self._entries:
646             return True
647
648         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
649             return False
650
651         try:
652             e = None
653
654             if group_uuid_pattern.match(k):
655                 project = self.api.groups().list(
656                     filters=[['group_class', '=', 'project'], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
657                 if project[u'items_available'] == 0:
658                     return False
659                 e = self.inodes.add_entry(ProjectDirectory(
660                     self.inode, self.inodes, self.api, self.num_retries, project[u'items'][0]))
661             else:
662                 e = self.inodes.add_entry(CollectionDirectory(
663                         self.inode, self.inodes, self.api, self.num_retries, k))
664
665             if e.update():
666                 if k not in self._entries:
667                     self._entries[k] = e
668                 else:
669                     self.inodes.del_entry(e)
670                 return True
671             else:
672                 self.inodes.invalidate_entry(self, k)
673                 self.inodes.del_entry(e)
674                 return False
675         except Exception as ex:
676             _logger.exception("arv-mount lookup '%s':", k)
677             if e is not None:
678                 self.inodes.del_entry(e)
679             return False
680
681     def __getitem__(self, item):
682         if item in self:
683             return self._entries[item]
684         else:
685             raise KeyError("No collection with id " + item)
686
687     def clear(self):
688         pass
689
690     def want_event_subscribe(self):
691         return not self.pdh_only
692
693
694 class TagsDirectory(Directory):
695     """A special directory that contains as subdirectories all tags visible to the user."""
696
697     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
698         super(TagsDirectory, self).__init__(parent_inode, inodes)
699         self.api = api
700         self.num_retries = num_retries
701         self._poll = True
702         self._poll_time = poll_time
703         self._extra = set()
704
705     def want_event_subscribe(self):
706         return True
707
708     @use_counter
709     def update(self):
710         with llfuse.lock_released:
711             tags = self.api.links().list(
712                 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
713                 select=['name'], distinct=True, limit=1000
714                 ).execute(num_retries=self.num_retries)
715         if "items" in tags:
716             self.merge(tags['items']+[{"name": n} for n in self._extra],
717                        lambda i: i['name'],
718                        lambda a, i: a.tag == i['name'],
719                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
720
721     @use_counter
722     @check_update
723     def __getitem__(self, item):
724         if super(TagsDirectory, self).__contains__(item):
725             return super(TagsDirectory, self).__getitem__(item)
726         with llfuse.lock_released:
727             tags = self.api.links().list(
728                 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
729             ).execute(num_retries=self.num_retries)
730         if tags["items"]:
731             self._extra.add(item)
732             self.update()
733         return super(TagsDirectory, self).__getitem__(item)
734
735     @use_counter
736     @check_update
737     def __contains__(self, k):
738         if super(TagsDirectory, self).__contains__(k):
739             return True
740         try:
741             self[k]
742             return True
743         except KeyError:
744             pass
745         return False
746
747
748 class TagDirectory(Directory):
749     """A special directory that contains as subdirectories all collections visible
750     to the user that are tagged with a particular tag.
751     """
752
753     def __init__(self, parent_inode, inodes, api, num_retries, tag,
754                  poll=False, poll_time=60):
755         super(TagDirectory, self).__init__(parent_inode, inodes)
756         self.api = api
757         self.num_retries = num_retries
758         self.tag = tag
759         self._poll = poll
760         self._poll_time = poll_time
761
762     def want_event_subscribe(self):
763         return True
764
765     @use_counter
766     def update(self):
767         with llfuse.lock_released:
768             taggedcollections = self.api.links().list(
769                 filters=[['link_class', '=', 'tag'],
770                          ['name', '=', self.tag],
771                          ['head_uuid', 'is_a', 'arvados#collection']],
772                 select=['head_uuid']
773                 ).execute(num_retries=self.num_retries)
774         self.merge(taggedcollections['items'],
775                    lambda i: i['head_uuid'],
776                    lambda a, i: a.collection_locator == i['head_uuid'],
777                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
778
779
780 class ProjectDirectory(Directory):
781     """A special directory that contains the contents of a project."""
782
783     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
784                  poll=False, poll_time=60):
785         super(ProjectDirectory, self).__init__(parent_inode, inodes)
786         self.api = api
787         self.num_retries = num_retries
788         self.project_object = project_object
789         self.project_object_file = None
790         self.project_uuid = project_object['uuid']
791         self._poll = poll
792         self._poll_time = poll_time
793         self._updating_lock = threading.Lock()
794         self._current_user = None
795         self._full_listing = False
796
797     def want_event_subscribe(self):
798         return True
799
800     def createDirectory(self, i):
801         if collection_uuid_pattern.match(i['uuid']):
802             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
803         elif group_uuid_pattern.match(i['uuid']):
804             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
805         elif link_uuid_pattern.match(i['uuid']):
806             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
807                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
808             else:
809                 return None
810         elif uuid_pattern.match(i['uuid']):
811             return ObjectFile(self.parent_inode, i)
812         else:
813             return None
814
815     def uuid(self):
816         return self.project_uuid
817
818     def items(self):
819         self._full_listing = True
820         return super(ProjectDirectory, self).items()
821
822     def namefn(self, i):
823         if 'name' in i:
824             if i['name'] is None or len(i['name']) == 0:
825                 return None
826             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
827                 # collection or subproject
828                 return i['name']
829             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
830                 # name link
831                 return i['name']
832             elif 'kind' in i and i['kind'].startswith('arvados#'):
833                 # something else
834                 return "{}.{}".format(i['name'], i['kind'][8:])
835         else:
836             return None
837
838
839     @use_counter
840     def update(self):
841         if self.project_object_file == None:
842             self.project_object_file = ObjectFile(self.inode, self.project_object)
843             self.inodes.add_entry(self.project_object_file)
844
845         if not self._full_listing:
846             return True
847
848         def samefn(a, i):
849             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
850                 return a.uuid() == i['uuid']
851             elif isinstance(a, ObjectFile):
852                 return a.uuid() == i['uuid'] and not a.stale()
853             return False
854
855         try:
856             with llfuse.lock_released:
857                 self._updating_lock.acquire()
858                 if not self.stale():
859                     return
860
861                 if group_uuid_pattern.match(self.project_uuid):
862                     self.project_object = self.api.groups().get(
863                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
864                 elif user_uuid_pattern.match(self.project_uuid):
865                     self.project_object = self.api.users().get(
866                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
867
868                 contents = arvados.util.list_all(self.api.groups().list,
869                                                  self.num_retries,
870                                                  filters=[["owner_uuid", "=", self.project_uuid],
871                                                           ["group_class", "=", "project"]])
872                 contents.extend(arvados.util.list_all(self.api.collections().list,
873                                                       self.num_retries,
874                                                       filters=[["owner_uuid", "=", self.project_uuid]]))
875
876             # end with llfuse.lock_released, re-acquire lock
877
878             self.merge(contents,
879                        self.namefn,
880                        samefn,
881                        self.createDirectory)
882             return True
883         finally:
884             self._updating_lock.release()
885
886     def _add_entry(self, i, name):
887         ent = self.createDirectory(i)
888         self._entries[name] = self.inodes.add_entry(ent)
889         return self._entries[name]
890
891     @use_counter
892     @check_update
893     def __getitem__(self, k):
894         if k == '.arvados#project':
895             return self.project_object_file
896         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
897             return super(ProjectDirectory, self).__getitem__(k)
898         with llfuse.lock_released:
899             contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
900                                                        ["group_class", "=", "project"],
901                                                        ["name", "=", k]],
902                                               limit=1).execute(num_retries=self.num_retries)["items"]
903             if not contents:
904                 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
905                                                                 ["name", "=", k]],
906                                                        limit=1).execute(num_retries=self.num_retries)["items"]
907         if contents:
908             name = sanitize_filename(self.namefn(contents[0]))
909             if name != k:
910                 raise KeyError(k)
911             return self._add_entry(contents[0], name)
912
913         # Didn't find item
914         raise KeyError(k)
915
916     def __contains__(self, k):
917         if k == '.arvados#project':
918             return True
919         try:
920             self[k]
921             return True
922         except KeyError:
923             pass
924         return False
925
926     @use_counter
927     @check_update
928     def writable(self):
929         with llfuse.lock_released:
930             if not self._current_user:
931                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
932             return self._current_user["uuid"] in self.project_object.get("writable_by", [])
933
934     def persisted(self):
935         return True
936
937     @use_counter
938     @check_update
939     def mkdir(self, name):
940         try:
941             with llfuse.lock_released:
942                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
943                                                     "name": name,
944                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
945             self.invalidate()
946         except apiclient_errors.Error as error:
947             _logger.error(error)
948             raise llfuse.FUSEError(errno.EEXIST)
949
950     @use_counter
951     @check_update
952     def rmdir(self, name):
953         if name not in self:
954             raise llfuse.FUSEError(errno.ENOENT)
955         if not isinstance(self[name], CollectionDirectory):
956             raise llfuse.FUSEError(errno.EPERM)
957         if len(self[name]) > 0:
958             raise llfuse.FUSEError(errno.ENOTEMPTY)
959         with llfuse.lock_released:
960             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
961         self.invalidate()
962
963     @use_counter
964     @check_update
965     def rename(self, name_old, name_new, src):
966         if not isinstance(src, ProjectDirectory):
967             raise llfuse.FUSEError(errno.EPERM)
968
969         ent = src[name_old]
970
971         if not isinstance(ent, CollectionDirectory):
972             raise llfuse.FUSEError(errno.EPERM)
973
974         if name_new in self:
975             # POSIX semantics for replacing one directory with another is
976             # tricky (the target directory must be empty, the operation must be
977             # atomic which isn't possible with the Arvados API as of this
978             # writing) so don't support that.
979             raise llfuse.FUSEError(errno.EPERM)
980
981         self.api.collections().update(uuid=ent.uuid(),
982                                       body={"owner_uuid": self.uuid(),
983                                             "name": name_new}).execute(num_retries=self.num_retries)
984
985         # Acually move the entry from source directory to this directory.
986         del src._entries[name_old]
987         self._entries[name_new] = ent
988         self.inodes.invalidate_entry(src, name_old)
989
990     @use_counter
991     def child_event(self, ev):
992         properties = ev.get("properties") or {}
993         old_attrs = properties.get("old_attributes") or {}
994         new_attrs = properties.get("new_attributes") or {}
995         old_attrs["uuid"] = ev["object_uuid"]
996         new_attrs["uuid"] = ev["object_uuid"]
997         old_name = sanitize_filename(self.namefn(old_attrs))
998         new_name = sanitize_filename(self.namefn(new_attrs))
999
1000         # create events will have a new name, but not an old name
1001         # delete events will have an old name, but not a new name
1002         # update events will have an old and new name, and they may be same or different
1003         # if they are the same, an unrelated field changed and there is nothing to do.
1004
1005         if old_attrs.get("owner_uuid") != self.project_uuid:
1006             # Was moved from somewhere else, so don't try to remove entry.
1007             old_name = None
1008         if ev.get("object_owner_uuid") != self.project_uuid:
1009             # Was moved to somewhere else, so don't try to add entry
1010             new_name = None
1011
1012         if old_attrs.get("is_trashed"):
1013             # Was previously deleted
1014             old_name = None
1015         if new_attrs.get("is_trashed"):
1016             # Has been deleted
1017             new_name = None
1018
1019         if new_name != old_name:
1020             ent = None
1021             if old_name in self._entries:
1022                 ent = self._entries[old_name]
1023                 del self._entries[old_name]
1024                 self.inodes.invalidate_entry(self, old_name)
1025
1026             if new_name:
1027                 if ent is not None:
1028                     self._entries[new_name] = ent
1029                 else:
1030                     self._add_entry(new_attrs, new_name)
1031             elif ent is not None:
1032                 self.inodes.del_entry(ent)
1033
1034
1035 class SharedDirectory(Directory):
1036     """A special directory that represents users or groups who have shared projects with me."""
1037
1038     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
1039                  poll=False, poll_time=60):
1040         super(SharedDirectory, self).__init__(parent_inode, inodes)
1041         self.api = api
1042         self.num_retries = num_retries
1043         self.current_user = api.users().current().execute(num_retries=num_retries)
1044         self._poll = True
1045         self._poll_time = poll_time
1046         self._updating_lock = threading.Lock()
1047
1048     @use_counter
1049     def update(self):
1050         try:
1051             with llfuse.lock_released:
1052                 self._updating_lock.acquire()
1053                 if not self.stale():
1054                     return
1055
1056                 contents = {}
1057                 roots = []
1058                 root_owners = set()
1059                 objects = {}
1060
1061                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1062                 if 'httpMethod' in methods.get('shared', {}):
1063                     page = []
1064                     while True:
1065                         resp = self.api.groups().shared(filters=[['group_class', '=', 'project']]+page,
1066                                                         order="uuid",
1067                                                         limit=10000,
1068                                                         count="none",
1069                                                         include="owner_uuid").execute()
1070                         if not resp["items"]:
1071                             break
1072                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1073                         for r in resp["items"]:
1074                             objects[r["uuid"]] = r
1075                             roots.append(r["uuid"])
1076                         for r in resp["included"]:
1077                             objects[r["uuid"]] = r
1078                             root_owners.add(r["uuid"])
1079                 else:
1080                     all_projects = arvados.util.list_all(
1081                         self.api.groups().list, self.num_retries,
1082                         filters=[['group_class','=','project']],
1083                         select=["uuid", "owner_uuid"])
1084                     for ob in all_projects:
1085                         objects[ob['uuid']] = ob
1086
1087                     current_uuid = self.current_user['uuid']
1088                     for ob in all_projects:
1089                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1090                             roots.append(ob['uuid'])
1091                             root_owners.add(ob['owner_uuid'])
1092
1093                     lusers = arvados.util.list_all(
1094                         self.api.users().list, self.num_retries,
1095                         filters=[['uuid','in', list(root_owners)]])
1096                     lgroups = arvados.util.list_all(
1097                         self.api.groups().list, self.num_retries,
1098                         filters=[['uuid','in', list(root_owners)+roots]])
1099
1100                     for l in lusers:
1101                         objects[l["uuid"]] = l
1102                     for l in lgroups:
1103                         objects[l["uuid"]] = l
1104
1105                 for r in root_owners:
1106                     if r in objects:
1107                         obr = objects[r]
1108                         if obr.get("name"):
1109                             contents[obr["name"]] = obr
1110                         #elif obr.get("username"):
1111                         #    contents[obr["username"]] = obr
1112                         elif "first_name" in obr:
1113                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1114
1115                 for r in roots:
1116                     if r in objects:
1117                         obr = objects[r]
1118                         if obr['owner_uuid'] not in objects:
1119                             contents[obr["name"]] = obr
1120
1121             # end with llfuse.lock_released, re-acquire lock
1122
1123             self.merge(viewitems(contents),
1124                        lambda i: i[0],
1125                        lambda a, i: a.uuid() == i[1]['uuid'],
1126                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
1127         except Exception:
1128             _logger.exception("arv-mount shared dir error")
1129         finally:
1130             self._updating_lock.release()
1131
1132     def want_event_subscribe(self):
1133         return True